kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Rajini Sivaram (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (KAFKA-6512) Java Producer: Excessive memory usage with compression enabled
Date Thu, 15 Feb 2018 18:03:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-6512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

Rajini Sivaram resolved KAFKA-6512.
       Resolution: Fixed
    Fix Version/s:     (was: 1.2.0)

Implemented options 1) and 2) from the description.

> Java Producer: Excessive memory usage with compression enabled
> --------------------------------------------------------------
>                 Key: KAFKA-6512
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6512
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 1.0.0
>         Environment: Windows 10
>            Reporter: Kyle Tinker
>            Assignee: Rajini Sivaram
>            Priority: Major
>             Fix For: 1.1.0
>         Attachments: KafkaSender.java
> h2. User Story
> As a user of the Java producer, I want a predictable memory usage for the Kafka client
so that I can ensure that my system is sized appropriately and will be stable even under heavy
> As a user of the Java producer, I want a smaller memory footprint so that my systems
don't consume as many resources.
> h2. Acceptance Criteria
>  * Enabling Compression in Kafka should not significantly increase the memory usage of
>  * The memory usage of Kafka's Java Producer should be roughly in line with the buffer
size (buffer.memory) and the number of producers declared.
> h2. Additional Information
> I've observed high memory usage in the producer when enabling compression (gzip or lz4). 
I don't observe the behavior with compression off, but with it on I'll run out of heap (2GB). 
Using a Java profiler, I see the data is in the KafkaLZ4BlockOutputStream (or related class
for gzip).   I see that MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with
this, but is not successful.  I'm most likely network bottlenecked, so I expect the producer
buffers to be full while the job is running and potentially a lot of unacknowledged records.
> I've tried using the default buffer.memory with 20 producers (across 20 threads) and
sending data as quickly as I can.  I've also tried 1MB of buffer.memory, which seemed to
reduce memory consumption but I could still run OOM in certain cases.  I have max.in.flight.requests.per.connection
set to 1.  In short, I should only have ~20 MB (20* 1MB) of data in buffers, but I can easily
exhaust 2000 MB used by Kafka.
> In looking at the code more, it looks like the KafkaLZ4BlockOutputStream doesn't clear
the compressedBuffer or buffer when close() is called.  In my heap dump, both of those are
~65k size each, meaning that each batch is taking up ~148k of space, of which 131k is buffers.
(buffer.memory=1,000,000 and messages are 1k each until the batch fills).
> Kafka tries to manage memory usage by calling MemoryRecordsBuilder:closeForRecordAppends(),
which as documented as "Release resources required for record appends (e.g. compression buffers)". 
However, this method doesn't actually clear those buffers because KafkaLZ4BlockOutputStream.close()
only writes the block and end mark and closes the output stream.  It doesn't actually clear
the buffer and compressedBuffer in KafkaLZ4BlockOutputStream.  Those stay allocated in RAM
until the block is acknowledged by the broker, processed in Sender:handleProduceResponse(),
and the batch is deallocated.  This memory usage therefore increases, possibly without bound. 
In my test program, the program died with approximately 345 unprocessed batches per producer
(20 producers), despite having max.in.flight.requests.per.connection=1.
> h2. Steps to Reproduce
>  # Create a topic test with plenty of storage
>  # Use a connection with a very fast upload pipe and limited download.  This allows
the outbound data to go out, but acknowledgements to be delayed flowing in.
>  # Download KafkaSender.java (attached to this ticket)
>  # Set line 17 to reference your Kafka broker
>  # Run the program with a 1GB Xmx value
> h2. Possible solutions
> There are a few possible optimizations I can think of:
>  # We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer as non-final
and null them in the close() method
>  # We could declare the MemoryRecordsBuilder.appendStream non-final and null it in the
closeForRecordAppends() method
>  # We could have the ProducerBatch discard the recordsBuilder in closeForRecordAppends(),
however, this is likely a bad idea because the recordsBuilder contains significant metadata
that is likely needed after the stream is closed.  It is also final.
>  # We could try to limit the number of non-acknowledged batches in flight.  This would
bound the maximum memory usage but may negatively impact performance.
> Fix #1 would only improve the LZ4 algorithm, and not any other algorithms.
> Fix #2 would improve all algorithms, compression and otherwise.  Of the 3 proposed here,
it seems the best.  This would also involve having to check appendStreamIsClosed in every
usage of appendStream within MemoryRecordsBuilder to avoid NPE's.
> Fix #4 is likely necessary if we want to bound the maximum memory usage of Kafka.  Removing
the buffers in Fix 1 or 2 will reduce the memory usage by ~90%, but theoretically there is
still no limit.

This message was sent by Atlassian JIRA

View raw message