Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A25E8200C08 for ; Thu, 26 Jan 2017 15:02:09 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A11BD160B40; Thu, 26 Jan 2017 14:02:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C6E24160B33 for ; Thu, 26 Jan 2017 15:02:08 +0100 (CET) Received: (qmail 90798 invoked by uid 500); 26 Jan 2017 14:02:07 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 90789 invoked by uid 99); 26 Jan 2017 14:02:07 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Jan 2017 14:02:07 +0000 Received: from mail-it0-f48.google.com (mail-it0-f48.google.com [209.85.214.48]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 73CDB1A018B for ; Thu, 26 Jan 2017 14:02:07 +0000 (UTC) Received: by mail-it0-f48.google.com with SMTP id k200so3222195itb.1 for ; Thu, 26 Jan 2017 06:02:07 -0800 (PST) X-Gm-Message-State: AIkVDXLgKdlYsspmPyR0cfK71hxHl+DSlwqohtGViuVyUKc4baZDzOWrB/W6XZh36mSoJpv3YCktnLGljSVL6Q== X-Received: by 10.36.103.9 with SMTP id u9mr26984299itc.91.1485439326842; Thu, 26 Jan 2017 06:02:06 -0800 (PST) MIME-Version: 1.0 Received: by 10.64.135.1 with HTTP; Thu, 26 Jan 2017 06:01:46 -0800 (PST) In-Reply-To: References: <185A8335-2A16-46B2-AE96-D346502F15DA@micardo.com> From: Robert Metzger Date: Thu, 26 Jan 2017 15:01:46 +0100 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Rate-limit processing To: "user@flink.apache.org" Content-Type: multipart/alternative; boundary=001a114ac8fe697c920546ffcb21 archived-at: Thu, 26 Jan 2017 14:02:09 -0000 --001a114ac8fe697c920546ffcb21 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Florian, you can rate-limit the Kafka consumer by implementing a custom DeserializationSchema that sleeps a bit from time to time (or at each deserialization step) On Tue, Jan 24, 2017 at 1:16 PM, Florian K=C3=B6nig wrote: > Hi Till, > > thank you for the very helpful hints. You are right, I already see > backpressure. In my case, that=E2=80=99s ok because it throttles the Kafk= a source. > Speaking of which: You mentioned putting the rate limiting mechanism into > the source. How can I do this with a Kafka source? Just extend the > Producer, or is there a better mechanism to hook into the connector? > > Cheers, > Florian > > > > Am 20.01.2017 um 16:58 schrieb Till Rohrmann : > > > > Hi Florian, > > > > any blocking of the user code thread is in general a not so good idea > because the checkpointing happens under the very same lock which also > guards the user code invocation. Thus any checkpoint barrier arriving at > the operator has only the chance to trigger the checkpointing once the > blocking is over. Even worse, if the blocking happens in a downstream > operator (not a source), then this blocking could cause backpressure. Sin= ce > the checkpoint barriers flow with the events and are processed in order, > the backpressure will then also influence the checkpointing time. > > > > So if you want to limit the rate, you should do it a the sources withou= t > blocking the source thread. You could for example count how many elements > you've emitted in the past second and if it exceeds your maximum, then yo= u > don't emit the next element to downstream operators until some time has > passed (this might end up in a busy loop but it allows the checkpointing = to > claim the lock). > > > > Cheers, > > Till > > > > On Fri, Jan 20, 2017 at 12:18 PM, Yassine MARZOUGUI < > y.marzougui@mindlytix.com> wrote: > > Hi, > > > > You might find this similar thread from the mailing list archive helpfu= l > : http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/throttled-stream-td6138.html. > > > > Best, > > Yassine > > > > 2017-01-20 10:53 GMT+01:00 Florian K=C3=B6nig : > > Hi, > > > > i need to limit the rate of processing in a Flink stream application. > Specifically, the number of items processed in a .map() operation has to > stay under a certain maximum per second. > > > > At the moment, I have another .map() operation before the actual > processing, which just sleeps for a certain time (e.g., 250ms for a limit > of 4 requests / sec) and returns the item unchanged: > > > > =E2=80=A6 > > > > public T map(final T value) throws Exception { > > Thread.sleep(delay); > > return value; > > } > > > > =E2=80=A6 > > > > This works as expected, but is a rather crude approach. Checkpointing > the job takes a very long time: minutes for a state of a few kB, which fo= r > other jobs is done in a few milliseconds. I assume that letting the whole > thread sleep for most of the time interferes with the checkpointing - not > good! > > > > Would using a different synchronization mechanism (e.g., > https://google.github.io/guava/releases/19.0/api/docs/ > index.html?com/google/common/util/concurrent/RateLimiter.html) help to > make checkpointing work better? > > > > Or, preferably, is there a mechanism inside Flink that I can use to > accomplish the desired rate limiting? I haven=E2=80=99t found anything in= the docs. > > > > Cheers, > > Florian > > > > > > > --001a114ac8fe697c920546ffcb21 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Florian,

you can rate-limit the Kafk= a consumer by implementing a custom DeserializationSchema that sleeps a bit= from time to time (or at each deserialization step)

On Tue, Jan 24, 2017 at 1:1= 6 PM, Florian K=C3=B6nig <florian.koenig@micardo.com> wrote:
Hi Till,

thank you for the very helpful hints. You are right, I already see backpres= sure. In my case, that=E2=80=99s ok because it throttles the Kafka source. = Speaking of which: You mentioned putting the rate limiting mechanism into t= he source. How can I do this with a Kafka source? Just extend the Producer,= or is there a better mechanism to hook into the connector?

Cheers,
Florian


> Am 20.01.2017 um 16:58 schrieb Till Rohrmann <trohrmann@apache.org>:
>
> Hi Florian,
>
> any blocking of the user code thread is in general a not so good idea = because the checkpointing happens under the very same lock which also guard= s the user code invocation. Thus any checkpoint barrier arriving at the ope= rator has only the chance to trigger the checkpointing once the blocking is= over. Even worse, if the blocking happens in a downstream operator (not a = source), then this blocking could cause backpressure. Since the checkpoint = barriers flow with the events and are processed in order, the backpressure = will then also influence the checkpointing time.
>
> So if you want to limit the rate, you should do it a the sources witho= ut blocking the source thread. You could for example count how many element= s you've emitted in the past second and if it exceeds your maximum, the= n you don't emit the next element to downstream operators until some ti= me has passed (this might end up in a busy loop but it allows the checkpoin= ting to claim the lock).
>
> Cheers,
> Till
>
> On Fri, Jan 20, 2017 at 12:18 PM, Yassine MARZOUGUI <y.marzougui@mindlytix.com> wrote:
> Hi,
>
> You might find this similar thread from the mailing list archive helpf= ul : http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.= com/throttled-stream-td6138.html.
>
> Best,
> Yassine
>
> 2017-01-20 10:53 GMT+01:00 Florian K=C3=B6nig <florian.koenig@micardo.com>:
> Hi,
>
> i need to limit the rate of processing in a Flink stream application. = Specifically, the number of items processed in a .map() operation has to st= ay under a certain maximum per second.
>
> At the moment, I have another .map() operation before the actual proce= ssing, which just sleeps for a certain time (e.g., 250ms for a limit of 4 r= equests / sec) and returns the item unchanged:
>
> =E2=80=A6
>
> public T map(final T value) throws Exception {
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Thread.sleep(delay);
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return value;
> }
>
> =E2=80=A6
>
> This works as expected, but is a rather crude approach. Checkpointing = the job takes a very long time: minutes for a state of a few kB, which for = other jobs is done in a few milliseconds. I assume that letting the whole t= hread sleep for most of the time interferes with the checkpointing - not go= od!
>
> Would using a different synchronization mechanism (e.g., https://google.github.io/guava/releases/19.0/api/docs/index.htm= l?com/google/common/util/concurrent/RateLimiter.html) help to= make checkpointing work better?
>
> Or, preferably, is there a mechanism inside Flink that I can use to ac= complish the desired rate limiting? I haven=E2=80=99t found anything in the= docs.
>
> Cheers,
> Florian
>
>



--001a114ac8fe697c920546ffcb21--