flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Austin Cawley-Edwards (Jira)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
Date Mon, 01 Jun 2020 18:50:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121227#comment-17121227
] 

Austin Cawley-Edwards edited comment on FLINK-10195 at 6/1/20, 6:49 PM:
------------------------------------------------------------------------

Hey all, just bringing this back up as I'm trying to figure out if we could replace the consumer
another way, but not looking like it. The only way I've found client-side flow control to
work with RabbitMQ is through setting the prefetch count on the channel. I think that in combination
with the work done by Luka, we might a decent solution where we could adjust the [channel's
prefetch count|#basicQos(int)]] based on the length of the blocking queue, which can be user-configurable.

 

This performs better than turning the consumer off/ on and is very simple to implement. I've
put together a small prototype/ playground in Node here ([https://github.com/austince/backpressured-consumer-prototype]),
along with some potential issues. The simplest implementation would be a static prefetch count,
but if we want to tune that and have it update depending on the space left in the buffer I
think that's possible too.

 

If we allow the buffer queue length to be user-configurable, I think it would handle the following
tickets as well:{color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, leading
to oom
 {color:#cc7832}- FLINK-17559 {color}Duplicate of {color:#808080}`FLINK-6885`{color}

 

{color:#172b4d}What do you all think of this proposal?{color}

{color:#172b4d}[~aljoscha] {color}

{color:#172b4d}[~dwysakowicz] {color}

{color:#172b4d}[~senegalo] {color}

 


was (Author: austince):
Hey all, just bringing this back up as I'm trying to figure out if we could replace the consumer
another way, but not looking like it. The only way I've found client-side flow control to
work with RabbitMQ is through setting the prefetch count on the channel. I think that in combination
with the work done by Luka, we might a decent solution where we could adjust the [channel's
prefetch count|[https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html#basicQos(int)]]
based on the length of the blocking queue, which can be user-configurable.

 

This performs better than turning the consumer off/ on and is very simple to implement. I've
put together a small prototype/ playground in Node here ([github.com/austince/backpressured-consumer-prototype|[https://github.com/austince/backpressured-consumer-prototype]]),
along with some potential issues. The simplest implementation would be a static prefetch count,
but if we want to tune that and have it update depending on the space left in the buffer I
think that's possible too.

 

If we allow the buffer queue length to be user-configurable, I think it would handle the following
tickets as well:{color:#cc7832}
{color}{color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, leading to oom
{color:#cc7832}- FLINK-17559 {color}Duplicate of {color:#808080}`FLINK-6885`{color}

 

{color:#172b4d}What do you all think of this proposal?{color}

{color:#172b4d}[~aljoscha] {color}

{color:#172b4d}[~dwysakowicz] {color}

{color:#172b4d}[~senegalo] {color}

 

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -----------------------------------------------------------------
>
>                 Key: FLINK-10195
>                 URL: https://issues.apache.org/jira/browse/FLINK-10195
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors/ RabbitMQ
>    Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>            Reporter: Luka Jurukovski
>            Assignee: Luka Jurukovski
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> The connection between the RabbitMQ server and the client does not appropriately back
pressure when auto acking is disabled. This becomes very problematic when a downstream process
throttles the data processing to slower then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, which grows
indefinitely (or technically to "Integer Max" Deliveries). Looking at RabbitMQ's metrics the
number of unacked messages looks like steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the QueueingConsumer works,
messages are added to the BlockingQueue faster then they are being removed and processed,
resulting in the previously described behavior.
> This may be intended behavior, however this isn't explicitly obvious in the documentation
or any of the examples I have seen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message