kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ben Stopford <...@confluent.io>
Subject Re: [DISCUSS] KAFKA-2063 Add possibility to bound fetch response size (was Re: [DISCUSS] Optimise memory used by replication process by using adaptive fetch message size)
Date Fri, 29 Jul 2016 18:59:03 GMT
Thanks for the kicking this one off Andrey. Generally it looks great! 

I left a comment on the Jira regarding whether we should remove the existing limitBytes, along
with a potential alternative to doing randomisation. 

B
> On 29 Jul 2016, at 09:17, Andrey L. Neporada <aneporada@yandex-team.ru> wrote:
> 
> Hi all!
> 
> I would like to get your feedback on PR for bug KAFKA-2063.
> Looks like KIP is needed there, but it would be nice to get feedback first.
> 
> Thanks,
> Andrey.
> 
> 
>> On 22 Jul 2016, at 12:26, Andrey L. Neporada <aneporada@yandex-team.ru> wrote:
>> 
>> Hi!
>> 
>> Thanks for feedback - I agree that the proper way to fix this issue is to provide
per-request data limit.
>> Will try to do it.
>> 
>> Thanks,
>> Andrey.
>> 
>> 
>> 
>>> On 21 Jul 2016, at 18:57, Jay Kreps <jay@confluent.io> wrote:
>>> 
>>> I think the memory usage for consumers can be improved a lot, but I think
>>> there may be a better way then what you are proposing.
>>> 
>>> The problem is exactly what you describe: the bound the user sets is
>>> per-partition, but the number of partitions may be quite high. The consumer
>>> could provide a bound on the response size by only requesting a subset of
>>> the partitions, but this would mean that if there was no data available on
>>> those partitions the consumer wouldn't be checking other partitions, which
>>> would add latency.
>>> 
>>> I think the solution is to add a new "max response size" parameter to the
>>> fetch request so the server checks all partitions but doesn't send back
>>> more than this amount in total. This has to be done carefully to ensure
>>> fairness (i.e. if one partition has unbounded amounts of data it shouldn't
>>> indefinitely starve other partitions).
>>> 
>>> This will fix memory management both in the replicas and for consumers.
>>> 
>>> There is a JIRA for this: https://issues.apache.org/jira/browse/KAFKA-2063
>>> 
>>> I think it isn't too hard to do and would be a huge aid to the memory
>>> profile of both the clients and server.
>>> 
>>> I also don't think there is much use in setting a max size that expands
>>> dynamically since in any case you have to be able to support the maximum,
>>> so you might as well always use that rather than expanding and contracting
>>> dynamically. That is, if your max fetch response size is 64MB you need to
>>> budget 64MB of free memory, so making it smaller some of the time doesn't
>>> really help you.
>>> 
>>> -Jay
>>> 
>>> On Thu, Jul 21, 2016 at 2:49 AM, Andrey L. Neporada <
>>> aneporada@yandex-team.ru> wrote:
>>> 
>>>> Hi all!
>>>> 
>>>> We noticed that our Kafka cluster uses a lot of memory for replication.
>>>> Our Kafka usage pattern is following:
>>>> 
>>>> 1. Most messages are small (tens or hundreds kilobytes at most), but some
>>>> (rare) messages can be several megabytes.So, we have to set
>>>> replica.fetch.max.bytes = max.message.bytes = 8MB
>>>> 2. Each Kafka broker handles several thousands of partitions from multiple
>>>> topics.
>>>> 
>>>> In this scenario total memory required for replication (i.e.
>>>> replica.fetch.max.bytes * numOfPartitions) is unreasonably big.
>>>> 
>>>> So we would like to propose following approach to fix this problem:
>>>> 
>>>> 1. Introduce new config parameter replica.fetch.base.bytes - which is the
>>>> initial size of replication data chunk. By default this parameter should
be
>>>> equal to replica.fetch.max.bytes so the replication process will work as
>>>> before.
>>>> 
>>>> 2. If the ReplicaFetcherThread fails when trying to replicate message
>>>> bigger than current replication chunk, we increase it twofold (or up to
>>>> replica.fetch.max.bytes, whichever is smaller) and retry.
>>>> 
>>>> 3. If the chunk is replicated successfully we try to decrease the size of
>>>> replication chunk back to replica.fetch.base.bytes.
>>>> 
>>>> 
>>>> By choosing replica.fetch.base.bytes in optimal way (in our case ~200K),
>>>> we we able to significatly decrease memory usage without any noticeable
>>>> impact on replication efficiency.
>>>> 
>>>> Here is JIRA ticket (with PR):
>>>> https://issues.apache.org/jira/browse/KAFKA-3979
>>>> 
>>>> Your comments and feedback are highly appreciated!
>>>> 
>>>> 
>>>> Thanks,
>>>> Andrey.
>> 
> 


Mime
View raw message