Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EE5701823C for ; Wed, 16 Dec 2015 10:06:08 +0000 (UTC) Received: (qmail 79145 invoked by uid 500); 16 Dec 2015 10:06:08 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 79064 invoked by uid 500); 16 Dec 2015 10:06:08 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 79055 invoked by uid 99); 16 Dec 2015 10:06:08 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Dec 2015 10:06:08 +0000 Received: from mail-wm0-f48.google.com (mail-wm0-f48.google.com [74.125.82.48]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 5050F1A0056 for ; Wed, 16 Dec 2015 10:06:08 +0000 (UTC) Received: by mail-wm0-f48.google.com with SMTP id l126so30796480wml.1 for ; Wed, 16 Dec 2015 02:06:08 -0800 (PST) MIME-Version: 1.0 X-Received: by 10.28.52.213 with SMTP id b204mr11118966wma.32.1450260367146; Wed, 16 Dec 2015 02:06:07 -0800 (PST) Received: by 10.28.170.68 with HTTP; Wed, 16 Dec 2015 02:06:07 -0800 (PST) In-Reply-To: References: Date: Wed, 16 Dec 2015 11:06:07 +0100 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: global watermark across multiple kafka consumers From: Till Rohrmann To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11435c0404115c0527010e7b --001a11435c0404115c0527010e7b Content-Type: text/plain; charset=UTF-8 Hi Andrew, as far as I know, there is nothing such as a prescribed way of handling this kind of situation. If you want to synchronize the watermark generation given a set of KafkaConsumers you need some kind of ground truth. This could be, for example, a central registry such as ZooKeeper in which you collect the current watermarks of the different consumers. You could access ZooKeeper from inside the TimestampExtractor. Alternatively, however a bit more hacky, you could exploit that the consumer tasks are usually colocated with consumer tasks from different topics. This means that you'll have multiple subtasks reading from the different Kafka topics running in the same JVM. You could then use class variables to synchronize the watermarks. But this assumes that each subtask reading the topic t from Kafka is colocated with at least one other subtask reading the topic t' from Kafka with t' in T \ {t} and T being the set of Kafka topics. Per default this should be the case. I'm wondering why you need a global watermark for you Kafka topics. Isn't it enough that you have individual watermarks for each topic? Cheers, Till On Tue, Dec 15, 2015 at 4:45 PM, Griess, Andrew wrote: > Hi guys, > > I have a question related to utilizing watermarks with multiple > FlinkKakfkaConsumer082 instances. The aim is to have a global watermark > across multiple kafka consumers where any message from any kafka partition > would update the same watermark. When testing a simple TimeStampExtractor > implementation it seems each consumer results in a separate watermark. Is > there a prescribed way of handling such a thing that anyone has any > experience with? > > Thanks for your help, > > Andrew Griess > > --001a11435c0404115c0527010e7b Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Andrew,

as far as I know, there is n= othing such as a prescribed way of handling this kind of situation. If you = want to synchronize the watermark generation given a set of KafkaConsumers = you need some kind of ground truth.=C2=A0

This cou= ld be, for example, a central registry such as ZooKeeper in which you colle= ct the current watermarks of the different consumers. You could access ZooK= eeper from inside the TimestampExtractor.

Alternat= ively, however a bit more hacky, you could exploit that the consumer tasks = are usually colocated with consumer tasks from different topics. This means= that you'll have multiple subtasks reading from the different Kafka to= pics running in the same JVM. You could then use class variables to synchro= nize the watermarks. But this assumes that each subtask reading the topic t= from Kafka is colocated with at least one other subtask reading the topic = t' from Kafka with t' in T \ {t} and T being the set of Kafka topic= s. Per default this should be the case.

I'm wo= ndering why you need a global watermark for you Kafka topics. Isn't it = enough that you have individual watermarks for each topic?

Cheers,
Till

<= div class=3D"gmail_quote">On Tue, Dec 15, 2015 at 4:45 PM, Griess, Andrew <= span dir=3D"ltr"><andrew.griess@sap.com> wrote:
Hi guys,

I have a question related to utilizing watermarks with multiple FlinkK= akfkaConsumer082 instances. The aim is to have a global watermark across mu= ltiple kafka consumers where any message from any kafka partition would upd= ate the same watermark. When testing a simple TimeStampExtractor implementation it seems each consumer results = in a separate watermark. Is there a prescribed way of handling such a thing= that anyone has any experience with?

Thanks for your help,

Andrew Griess


--001a11435c0404115c0527010e7b--