From jira-return-9342-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed Jan 31 23:59:05 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 9BAD8180662 for ; Wed, 31 Jan 2018 23:59:05 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8437C160C42; Wed, 31 Jan 2018 22:59:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D4CBC160C55 for ; Wed, 31 Jan 2018 23:59:04 +0100 (CET) Received: (qmail 29887 invoked by uid 500); 31 Jan 2018 22:59:02 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 29792 invoked by uid 99); 31 Jan 2018 22:59:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 Jan 2018 22:59:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 72AFD1987AE for ; Wed, 31 Jan 2018 22:59:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -110.311 X-Spam-Level: X-Spam-Status: No, score=-110.311 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id zmUmLUn0MMus for ; Wed, 31 Jan 2018 22:59:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id E436C5FB04 for ; Wed, 31 Jan 2018 22:59:00 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 689E8E012F for ; Wed, 31 Jan 2018 22:59:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 26D46212FC for ; Wed, 31 Jan 2018 22:59:00 +0000 (UTC) Date: Wed, 31 Jan 2018 22:59:00 +0000 (UTC) From: "Kyle Tinker (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Created] (KAFKA-6512) Java Producer: Excessive memory usage with compression enabled MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 Kyle Tinker created KAFKA-6512: ---------------------------------- Summary: Java Producer: Excessive memory usage with compressio= n enabled Key: KAFKA-6512 URL: https://issues.apache.org/jira/browse/KAFKA-6512 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 1.0.0 Environment: Windows 10 Reporter: Kyle Tinker Attachments: KafkaSender.java h2. User Story As a user of the Java producer, I want a predictable memory usage for the K= afka client so that I can ensure that my system is sized appropriately and = will be stable even under heavy usage. As a user of the Java producer, I want a smaller memory footprint so that m= y systems don't consume as many resources. h2. Acceptance Criteria * Enabling Compression in Kafka should not significantly increase the memo= ry usage of Kafka * The memory usage of Kafka's Java Producer should be roughly in line with= the buffer size (buffer.memory) and the number of producers declared. h2. Additional Information I've observed high memory usage in the producer when enabling compression (= gzip or lz4).=C2=A0 I don't observe the behavior with compression off, but = with it on I'll run out of heap (2GB).=C2=A0 Using a Java profiler, I see t= he data is in the KafkaLZ4BlockOutputStream (or related class for gzip).=C2= =A0=C2=A0 I see that MemoryRecordsBuilder:closeForRecordAppends() is trying= to deal with this, but is not successful.=C2=A0 I'm most likely network bo= ttlenecked, so I expect the producer buffers to be full while the job is ru= nning and potentially a lot of unacknowledged records. I've tried using the default buffer.memory with 20 producers (across 20 thr= eads) and sending data as quickly as I can.=C2=A0 I've also tried 1MB of bu= ffer.memory, which seemed to reduce memory consumption but I could still ru= n OOM in certain cases.=C2=A0 I have max.in.flight.requests.per.connection = set to 1.=C2=A0 In short, I should only have ~20 MB (20* 1MB) of data in bu= ffers, but I can easily exhaust 2000 MB used by Kafka. In looking at the code more, it looks like the KafkaLZ4BlockOutputStream do= esn't clear the compressedBuffer or buffer when close() is called.=C2=A0 In= my heap dump, both of those are ~65k size each, meaning that each batch is= taking up ~148k of space, of which 131k is buffers. (buffer.memory=3D1,000= ,000 and messages are 1k each until the batch fills). Kafka tries to manage memory usage by calling MemoryRecordsBuilder:closeFor= RecordAppends(), which as documented as "Release resources required for rec= ord appends (e.g. compression buffers)".=C2=A0 However, this method doesn't= actually clear those buffers because KafkaLZ4BlockOutputStream.close() onl= y writes the block and end mark and closes the output stream.=C2=A0 It does= n't actually clear the buffer and compressedBuffer in KafkaLZ4BlockOutputSt= ream.=C2=A0 Those stay allocated in RAM until the block is acknowledged by = the broker, processed in Sender:handleProduceResponse(), and the batch is d= eallocated.=C2=A0 This memory usage therefore increases, possibly without b= ound.=C2=A0 In my test program, the program died with approximately 345 unp= rocessed batches per producer (20 producers), despite having max.in.flight.= requests.per.connection=3D1. h2. Steps to Reproduce # Create a topic test with plenty of storage # Use a connection with a very fast upload pipe and limited download.=C2= =A0 This allows the outbound data to go out, but acknowledgements to be del= ayed flowing in. # Download KafkaSender.java (attached to this ticket) # Set line 17 to reference your Kafka broker # Run the program with a 1GB Xmx value h2. Possible solutions There are a few possible optimizations I can think of: # We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer a= s non-final and null them in the close() method # We could declare the MemoryRecordsBuilder.appendStream non-final and nul= l it in the closeForRecordAppends() method # We could have the ProducerBatch discard the recordsBuilder in closeForRe= cordAppends(), however, this is likely a bad idea because the recordsBuilde= r contains significant metadata that is likely needed after the stream is c= losed.=C2=A0 It is also final. # We could try to limit the number of non-acknowledged batches in flight.= =C2=A0 This would bound the maximum memory usage but may negatively impact = performance. =C2=A0 Fix #1 would only improve the LZ4 algorithm, and not any other algorithms. Fix #2 would improve all algorithms, compression and otherwise.=C2=A0 Of th= e 3 proposed here, it seems the best.=C2=A0 This would also involve having = to check appendStreamIsClosed in every usage of appendStream within MemoryR= ecordsBuilder to avoid NPE's. Fix #4 is likely necessary if we want to bound the maximum memory usage of = Kafka.=C2=A0 Removing the buffers in Fix 1 or 2 will reduce the memory usag= e by ~90%, but theoretically there is still no limit. -- This message was sent by Atlassian JIRA (v7.6.3#76005)