flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jingsong Li <jingsongl...@gmail.com>
Subject Re: Re:FlinkSQL Retraction 问题原理咨询
Date Wed, 06 May 2020 04:49:21 GMT
Hi,

> sink 表中没有任何主键或唯一键

这个时候更合理的方式应该是抛出异常,不过实现上可能有些不好搞

> 回撤导致的结果变成 0 ,就会执行 delete , 否则就是update

你理解的完全正确

Best
Jingsong Lee

On Wed, May 6, 2020 at 12:39 PM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:

>
> Thanks  Jingsong Lee.
>
> 我用的是 MySQL,sink 表中没有任何主键或唯一键.
> 如果 sink 表设主键或唯一键,确实能达到只保留两条记录的效果。
>
> 我把 flink sql-client 客户端设置 SET execution.result-mode=changelog
> 试验了下,左边标上了是第几条 kafka 消息导致的行为:
>
>  +/- tms_company order_cnt 1 + zhongtong 1 2 - zhongtong 1 2 + yuantong 1
> 3 - yuantong 1 3 + yuantong 2 4 - yuantong 2 4 + yuantong 1 4 + zhongtong 1
>
> 第1条消息:执行一个 INSERT
> 第2条消息:执行了 一个 DELETE, 一个  INSERT
> 第3条消息:执行了一个  INSERT ON DUPLICATE UPDATE
> 第4条消息:执行了两个  INSERT ON DUPLICATE UPDATE
>
>
> 我总结这个逻辑应该是如果 回撤导致的结果变成 0 ,就会执行 delete
, 否则就是    INSERT ON DUPLICATE UPDATE
>
> 不知道我这样理解是否正确。
>
> 谢谢,
> 王磊
>
>
>
>
> wanglei2@geekplus.com.cn
>
> Sender: Jingsong Li
> Send Time: 2020-05-06 11:35
> Receiver: user-zh
> Subject: Re: Re:FlinkSQL Retraction 问题原理咨询
> Hi,
>
> 问题一:删除数据可不单单只是retract stream的功能。upsert
> stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert
> stream也有retract的input数据的。JDBC实现的是upsert stream的消费。
>
> 问题二:正确数据应该是:
> 1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
> 2  {"order_id":1,"tms_company":"yuantong"}      数据库1条记录: yuantong 1  (
删除
> zhongtong 1)
> 3  {"order_id":2,"tms_company":"yuantong"}     数据库1条记录: yuantong 2  (
> 删除yuantong 1)
> 4  {"order_id":2,"tms_company":"zhongtong"}   数据库2条记录: yuantong 1,
> zhongtong 1    ( 删除yuantong 2)
>
> 你用了什么dialect?是不是mysql?
> Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。
> 看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建?
>
> Best,
> Jingsong Lee
>
> On Wed, May 6, 2020 at 10:36 AM wanglei2@geekplus.com.cn <
> wanglei2@geekplus.com.cn> wrote:
>
> >
> > 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就
两个 order_id, order_id 对应的
> > tms_company 是有变化的。
> > 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。
> >
> >
> >
> >
> > wanglei2@geekplus.com.cn
> >
> > 发件人: Michael Ran
> > 发送时间: 2020-04-30 17:23
> > 收件人: user-zh
> > 主题: Re:FlinkSQL Retraction 问题原理咨询
> >
> >
> >
> > 指定的更新键是tms_company?
> >
> >
> > 结果是:
> > yuantong:2
> > zhongtong:2
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-04-30 17:08:22,"wanglei2@geekplus.com.cn" <
> wanglei2@geekplus.com.cn>
> > 写道:
> > >
> > >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读
kafka 写到
> > RDS, RDS 表没有主键,也没有唯一键。
> > >
> > >INSERT INTO table_out select  tms_company,  count(distinct order_id) as
> > order_cnt from
> > >    (select order_id, LAST_VALUE(tms_company) AS tms_company from
> > dwd_table group by order_id)
> > > group by tms_company;
> > >
> > >
> > >总共发送了 4 条消息,顺序如下:
> > >
> > >1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong
1
> > >
> > >2  {"order_id":1,"tms_company":"yuantong"}      数据库1条记录: yuantong
1
> > (上一条记录被删除了)
> > >
> > >3  {"order_id":2,"tms_company":"yuantong"}     数据库2条记录: yuantong
1,
> > yuantong 2  (增加了条记录,没有删除)
> > >
> > >4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong
1,
> > yuantong 2, yuantong 1, zhongtong 1    (增加了两条记录,没有删除)
> > >
> > >
> > >问题一:
> > >    第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是
RetractStream 的功能。当我看源码
> >
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> > 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
> > >
> > >问题二:
> > >   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2,
为什么没把 yuantong 1, 删除呢?
> > >   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
> > >
> > >谢谢,
> > >王磊
> > >
> > >
> > >
> > >wanglei2@geekplus.com.cn
> >
>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message