flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 刘宇宝 <liuyu...@yingmi.cn>
Subject Re: 如何合并 binlog stream 和 table stream?
Date Tue, 07 Apr 2020 08:05:24 GMT
没有 join,只是简单的 union:


	DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…));
	DataStream snapshotStream = env.createInput(JDBCInputFormat.buildJDBCInputFormat()….);
 
	// map() is to convert two streams into same type:  (action,  fields…),  where action is
“insert”, “update”, “delete”.  The action for “snapshotStream” is always “insert”.
	DataStream tableStream = binlogStream.map(…).union(snapshotStream.map(…));
 
	tableStream.print();
	env.execute(“example”);

我希望下游只看到一个流,这个流里先出现 snapshotStream 的所有消息,等这个发完后,再从
binlog 里读取,但是上面的代码段里
我控制不了 snapshotStream和 binlogStream 谁先发完消息。

你说的消费完后再切换到 Kafka 具体怎么做? DataStream 「消费完」这个事件没有暴露
hook 出来,而且好像 DataStream 的 DAG 构造好后不能变了??

Snapshot + binlog 的幂等是可以保证的,binlog 的 insert/update/delete 总是覆盖到
snapshot 之上。

user@flink 邮件列表里有人提到 side input,跟我的需求很像,binlogStream 开一个
side input 读取完 snapshotStream 然后再发自己(binlogStream) 的消息,但是可惜这个功能还没做完。


谢谢!


On 2020/4/7, 2:16 PM, "Jark Wu" <imjark@gmail.com> wrote:

    Hi,
    
    你这里的合并是用join 来做么? 这样的话,会比较耗性能。
    
    一种做法是先消费 jdbc table, 消费完后,再切换到 kafka 上。这种要求
binlog 是幂等操作的,因为会有多处理一部分的
    binlog,没法做到 精确地切换到 kafka offset 上。
    
    另外你也可以参考下 StreamSQL 的 bootstrap 的做法:
    https://streamsql.io/blog/from-apache-kafka-to-apache-pulsar
    
    Best,
    Jark
    
    
    On Sun, 5 Apr 2020 at 22:48, 刘宇宝 <liuyubao@yingmi.cn> wrote:
    
    > 大家好,
    >
    > 我在用 Debezium 从数据库读取最新的 binlog 写入 Kafka,比如对于
mysql_server.test.tableA 有一个
    > topic “mysql_server.test.tableA”,我需要在 Flink 里实现这样的逻辑:
    >
    >
    >   1.  先连接上 Kafka 开始消费 topic “mysql_server.test.tableA”,确保连接成功,记为
    > binlog-stream,但是要暂停消费 Kafka;
    >   2.  用 JDBCInputFormat 读取 test.tableA 到一个 DataStream 里,记为 table-stream;
    >   3.  合并两个 streams,消费完 table-stream 后再开始消费 binlog-stream,这样可以确保
binlog 是
    > *后*  应用到某个快照表上。
    >
    > 问题是我怎么能暂停消费 binlog-stream 呢? 我目前想到的办法是用
flink state 做一个全局状态
    > startBinlog,初始值为 false:
    >
    >   binlog-stream -> waitOperator   ->   sinkOperator
    >   table-stream -> notifyOperator -> sinkOperator
    >
    > 两个流被合并输出到 sinkOperator,waitOperator() 会 while loop阻塞式的检查全局状态,
等
    > table-stream 消费完(不知道怎么判断消费完了。。。),  notifyOperator
修改全局状态,这样 binlog-stream
    > 就能被继续消费了。
    >
    > 但由于 kafka consumer 如果长期阻塞不 ack 的话,kafka consumer 会被断开,所以这个做法应该是不行的。
    >
    > 请教怎么破?
    >
    > 谢谢!
    >
    >
    

Mime
View raw message