flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: global watermark across multiple kafka consumers
Date Thu, 17 Dec 2015 14:31:01 GMT
@Andrew: Just to make sure that there is no confusion:

Even though every Kafka Source generates a local watermark, these
watermarks are merged when the streams are merged, for example in union()
or keyBy() steps.
The operator that merges streams tracks the current watermark of each
stream and then emits according watermarks.

That way, most cases do not need and global watermark coordination.

Here is an example:

DataStream<X> kafka1 = env.addSource(new KafakConsumer("topicA",
...)).setParallelism(2); // assume this has 2 kafka partitions

DataStream<X> kafka2 = env.addSource(new KafakConsumer("topicB",
...)).setParallelism(1); // asume this has 1 kafka partition


Now assume this:

  - Kafka source 1 (subtask 1) emits watermark 17
  - Kafka source 1 (subtask 2) emits watermark 21
  - Kafka source 2 (subtask 1) emits watermark 5

  - The map() after the keyBy receives the watermarks from the two subtasks
of kafka1 and has its watermark at 17 (the min)

  - The union() has the watermark at 5

  - Kafka source 1 (subtask 1) emits watermark 24
  - Kafka source 1 (subtask 2) emits watermark 22
  - Kafka source 2 (subtask 1) emits watermark 27

  - The map() after the keyBy now has the watermark at 22
  - The union has the watermark at 22 as well now

Hope that illustrates Flink's mechanism a bit. Do you think this mechanism
handle your watermark coordination?


On Wed, Dec 16, 2015 at 11:06 AM, Till Rohrmann <trohrmann@apache.org>

> Hi Andrew,
> as far as I know, there is nothing such as a prescribed way of handling
> this kind of situation. If you want to synchronize the watermark generation
> given a set of KafkaConsumers you need some kind of ground truth.
> This could be, for example, a central registry such as ZooKeeper in which
> you collect the current watermarks of the different consumers. You could
> access ZooKeeper from inside the TimestampExtractor.
> Alternatively, however a bit more hacky, you could exploit that the
> consumer tasks are usually colocated with consumer tasks from different
> topics. This means that you'll have multiple subtasks reading from the
> different Kafka topics running in the same JVM. You could then use class
> variables to synchronize the watermarks. But this assumes that each subtask
> reading the topic t from Kafka is colocated with at least one other subtask
> reading the topic t' from Kafka with t' in T \ {t} and T being the set of
> Kafka topics. Per default this should be the case.
> I'm wondering why you need a global watermark for you Kafka topics. Isn't
> it enough that you have individual watermarks for each topic?
> Cheers,
> Till
> On Tue, Dec 15, 2015 at 4:45 PM, Griess, Andrew <andrew.griess@sap.com>
> wrote:
>> Hi guys,
>> I have a question related to utilizing watermarks with multiple
>> FlinkKakfkaConsumer082 instances. The aim is to have a global watermark
>> across multiple kafka consumers where any message from any kafka partition
>> would update the same watermark. When testing a simple TimeStampExtractor
>> implementation it seems each consumer results in a separate watermark. Is
>> there a prescribed way of handling such a thing that anyone has any
>> experience with?
>> Thanks for your help,
>> Andrew Griess

View raw message