flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jark Wu <imj...@gmail.com>
Subject Re: 如何合并 binlog stream 和 table stream?
Date Tue, 07 Apr 2020 08:29:25 GMT
如果你的作业没有 state,那么 全量和增量部分,可以分成两个独立的作业。
如果你的作业有 state,主要就是设计 state 复用的问题。两个作业的拓扑结构需要确保一致(为了复用
savepoint),一个作业的
source operator 是 jdbc,另一个 source operator 是 kafka。
当运行完 jdbc 作业后,对作业触发一次 savepoint,然后用这个 savepoint
恢复 kafka 作业,可以从 earliest
开始读取(假设作业支持幂等)。

这里的一个问题,主要是如何对 jdbc 作业触发 savepoint,因为 jdbc InputFormat
目前是
bounded,所以读完后整个作业就结束了,就无法进行 savepoint。
所以这里可能需要自己修改下源码,让 jdbc source 永远不要结束,但通过日志或者
metric 或其他方式通知外界数据已经读完(可以开始触发
savepoint)。

希望这些可以帮助到你。

Best.
Jark


On Tue, 7 Apr 2020 at 16:14, 刘宇宝 <liuyubao@yingmi.cn> wrote:

> 我看了下 streamsql 那个 bootstrap 一段,没看懂怎么处理,我现在已经有
mysql table,算是 materialized
> view 了,也有一份
> Kafka 里的 binlog 数据,问题的麻烦在于怎么「先消费 jdbc table, 消费完后,再切换到
kafka 上」,从 flink
> 文档来看,一旦
> 我把 jdbc table 的流和 kafka 的流加入 env 里,env.execute() 之后,两个流就同时往下游发数据了——我期望的是
> jdbc table
> 的流发完了,才开始发 kafka 的流。
>
> 谢谢!
>
> On 2020/4/7, 2:16 PM, "Jark Wu" <imjark@gmail.com> wrote:
>
>     Hi,
>
>     你这里的合并是用join 来做么? 这样的话,会比较耗性能。
>
>     一种做法是先消费 jdbc table, 消费完后,再切换到 kafka 上。这种要求
binlog 是幂等操作的,因为会有多处理一部分的
>     binlog,没法做到 精确地切换到 kafka offset 上。
>
>     另外你也可以参考下 StreamSQL 的 bootstrap 的做法:
>     https://streamsql.io/blog/from-apache-kafka-to-apache-pulsar
>
>     Best,
>     Jark
>
>
>     On Sun, 5 Apr 2020 at 22:48, 刘宇宝 <liuyubao@yingmi.cn> wrote:
>
>     > 大家好,
>     >
>     > 我在用 Debezium 从数据库读取最新的 binlog 写入 Kafka,比如对于
mysql_server.test.tableA
> 有一个
>     > topic “mysql_server.test.tableA”,我需要在 Flink 里实现这样的逻辑:
>     >
>     >
>     >   1.  先连接上 Kafka 开始消费 topic “mysql_server.test.tableA”,确保连接成功,记为
>     > binlog-stream,但是要暂停消费 Kafka;
>     >   2.  用 JDBCInputFormat 读取 test.tableA 到一个 DataStream 里,记为
> table-stream;
>     >   3.  合并两个 streams,消费完 table-stream 后再开始消费 binlog-stream,这样可以确保
> binlog 是
>     > *后*  应用到某个快照表上。
>     >
>     > 问题是我怎么能暂停消费 binlog-stream 呢? 我目前想到的办法是用
flink state 做一个全局状态
>     > startBinlog,初始值为 false:
>     >
>     >   binlog-stream -> waitOperator   ->   sinkOperator
>     >   table-stream -> notifyOperator -> sinkOperator
>     >
>     > 两个流被合并输出到 sinkOperator,waitOperator() 会 while loop阻塞式的检查全局状态,
等
>     > table-stream 消费完(不知道怎么判断消费完了。。。),  notifyOperator
修改全局状态,这样
> binlog-stream
>     > 就能被继续消费了。
>     >
>     > 但由于 kafka consumer 如果长期阻塞不 ack 的话,kafka consumer 会被断开,所以这个做法应该是不行的。
>     >
>     > 请教怎么破?
>     >
>     > 谢谢!
>     >
>     >
>
>
>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message