flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: how to hold a stream until another stream is drained?
Date Mon, 06 Apr 2020 20:45:17 GMT
Hi,

With Flink streaming operators

However, these parts are currently being reworked to enable a better
integration of batch and streaming use cases (or hybrid use cases such as
yours).
A while back, we wrote a blog post about these plans [1]:

> *"Unified Stream Operators:* Blink extends the Flink streaming runtime
operator model to support selectively reading from different inputs, while
keeping the push model for very low latency. This control over the inputs
helps to now support algorithms like hybrid hash-joins on the same operator
and threading model as continuous symmetric joins through RocksDB. These
operators also form the basis for future features like “Side Inputs”
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API>
."

I'm not familiar with the internal details here, but I found the
InputSelectable [2] interface that looks like it would do what you are
looking for.
Note that this interface is not used on the higher-level DataStream API
level, but rather on the lower StreamOperator level.

Best, Fabian

[1]
https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html
[2]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelectable.java




W

Am Mo., 6. Apr. 2020 um 12:49 Uhr schrieb 刘宇宝 <liuyubao@yingmi.cn>:

> I’m using JDBCInputFormat to read snapshot of a MySQL table  and
> FlinkKafkaConsumer to read binlog which is written to Kafka by Debezium.
>
>
>
> DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…));
>
> DataStream snapshotStream =
> env.createInput(JDBCInputFormat.buildJDBCInputFormat()….);
>
>
>
> // map() is to convert two streams into same type:  (action,  fields…),
> where action is “insert”, “update”, “delete”.  The action for
> “snapshotStream” is always “insert”.
>
> DataStream tableStream = binlogStream.map(…).union(snapshotStream.map(…));
>
>
>
> tableStream.print();
>
> env.execute(“example”);
>
>
>
>    1. To make sure “tableStream” doesn’t miss any row,  the
>    “binlogStream” must connect to  Kafka first so that binlog starts before
>    the table snapshot,  I can roughly achieve this by
>    “myKafkaConsumer.setStartFromTimestamp(System.currentTimeMillis() –
>    600*1000)”.
>    2. To make sure changes from “binlogStream” always overwrite upon
>    “snapshotStream”,   I need a way to hold “binlogStream”  until
>    “snapshotStream” is drained,  so that changes from “binlogStream” are all
>    behind changes from “snapshotStream”.  How can I achieve this ?
>
>
>
> I’m considering a wrapper SourceFunction to combine FlinkKafkaConsumer and
> JDBCInputFormat,  but they are different on parallelism  and
> checkpointing,  I’m not sure how to get the wrapper right and even whether
> it’s right direction.
>
>
>
> Any suggestion will be very appreciated!
>
>
>

Mime
View raw message