flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 刘宇宝 <liuyu...@yingmi.cn>
Subject how to hold a stream until another stream is drained?
Date Mon, 06 Apr 2020 10:48:54 GMT
I’m using JDBCInputFormat to read snapshot of a MySQL table  and FlinkKafkaConsumer to read
binlog which is written to Kafka by Debezium.

DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…));
DataStream snapshotStream = env.createInput(JDBCInputFormat.buildJDBCInputFormat()….);

// map() is to convert two streams into same type:  (action,  fields…),  where action is
“insert”, “update”, “delete”.  The action for “snapshotStream” is always “insert”.
DataStream tableStream = binlogStream.map(…).union(snapshotStream.map(…));

tableStream.print();
env.execute(“example”);


  1.  To make sure “tableStream” doesn’t miss any row,  the “binlogStream” must
connect to  Kafka first so that binlog starts before the table snapshot,  I can roughly achieve
this by “myKafkaConsumer.setStartFromTimestamp(System.currentTimeMillis() – 600*1000)”.
  2.  To make sure changes from “binlogStream” always overwrite upon “snapshotStream”,
  I need a way to hold “binlogStream”  until “snapshotStream” is drained,  so that
changes from “binlogStream” are all behind changes from “snapshotStream”.  How can
I achieve this ?

I’m considering a wrapper SourceFunction to combine FlinkKafkaConsumer and JDBCInputFormat,
 but they are different on parallelism  and checkpointing,  I’m not sure how to get the
wrapper right and even whether it’s right direction.

Any suggestion will be very appreciated!

Mime
View raw message