Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2B47A200CDD for ; Mon, 24 Jul 2017 07:12:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2311916487B; Mon, 24 Jul 2017 05:12:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 69CC816486C for ; Mon, 24 Jul 2017 07:12:06 +0200 (CEST) Received: (qmail 31964 invoked by uid 500); 24 Jul 2017 05:12:05 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 31951 invoked by uid 99); 24 Jul 2017 05:12:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Jul 2017 05:12:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 273831A07A4 for ; Mon, 24 Jul 2017 05:12:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id M8Dz82tRIQ_3 for ; Mon, 24 Jul 2017 05:12:03 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 4B47B5FCB0 for ; Mon, 24 Jul 2017 05:12:03 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id CE189E08F5 for ; Mon, 24 Jul 2017 05:12:02 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 471EF21EEB for ; Mon, 24 Jul 2017 05:12:00 +0000 (UTC) Date: Mon, 24 Jul 2017 05:12:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-6301) Flink KafkaConnector09 leaks memory on reading compressed messages due to a Kafka consumer bug MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 24 Jul 2017 05:12:07 -0000 [ https://issues.apache.org/jira/browse/FLINK-6301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16097955#comment-16097955 ] ASF GitHub Bot commented on FLINK-6301: --------------------------------------- Github user vidhu5269 closed the pull request at: https://github.com/apache/flink/pull/4015 > Flink KafkaConnector09 leaks memory on reading compressed messages due to a Kafka consumer bug > ---------------------------------------------------------------------------------------------- > > Key: FLINK-6301 > URL: https://issues.apache.org/jira/browse/FLINK-6301 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.2.0, 1.1.3, 1.1.4 > Reporter: Rahul Yadav > Assignee: Rahul Yadav > Attachments: jeprof.24611.1228.i1228.heap.svg, jeprof.24611.1695.i1695.heap.svg, jeprof.24611.265.i265.heap.svg, jeprof.24611.3138.i3138.heap.svg, jeprof.24611.595.i595.heap.svg, jeprof.24611.705.i705.heap.svg, jeprof.24611.81.i81.heap.svg, POSTFIX.jeprof.14880.1944.i1944.heap.svg, POSTFIX.jeprof.14880.4129.i4129.heap.svg, POSTFIX.jeprof.14880.961.i961.heap.svg, POSTFIX.jeprof.14880.99.i99.heap.svg, POSTFIX.jeprof.14880.9.i9.heap.svg > > > Hi > We are running Flink on a standalone cluster with 8 TaskManagers having 8 vCPUs and 8 slots each. Each host has 16 GB of RAM. > In our jobs, > # We are consuming gzip compressed messages from Kafka using *KafkaConnector09* and use *rocksDB* backend for checkpoint storage. > # To debug the leak, we used *jemalloc and jprof* to profile the sources of malloc calls from the java process and attached are the profiles generated at various stages of the job. As we can see, apart from the os.malloc and rocksDB.allocateNewBlock, there are additional malloc calls coming from inflate() method of java.util.zip.inflater. These calls are innocuous as long as the inflater.end() method is called after it's use. > # To look for sources of inflate() method, we used Btrace on the running process to dump caller stack on the method call. Following is the stackTrace we got: > {code} > java.util.zip.Inflater.inflate(Inflater.java) > java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152) > java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117) > java.io.DataInputStream.readFully(DataInputStream.java:195) > org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:253) > org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210) > org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79) > org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45) > org.apache.kafka.common.record.MemoryRecords$RecordsIterator.innerDone(MemoryRecords.java:282) > org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:233) > org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210) > org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79) > org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45) > org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:563) > org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69) > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139) > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136) > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380) > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274) > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:227) > {code} > The end() method on Inflater is called inside the close() method of *InflaterInputSteam* (extended by *GZIPInputStream*) but looking through the Kafka consumer code, we found that RecordsIterator is not closing the compressor stream after use and hence, causing the memory leak: > https://github.com/apache/kafka/blob/23c69d62a0cabf06c4db8b338f0ca824dc6d81a7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L210 > https://issues.apache.org/jira/browse/KAFKA-3937 was filed for this and the issue was fixed in 0.10.1.0 but not back-ported to previous versions. > So, I would assume that we have to two paths from here: > 1. Wait for the changes to be back-ported to 0.9.x Kafka consumer and then, update the Kafka-clients dependency: > https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.9/pom.xml#L40 > 2. Update the kafka-connector10 to use 0.10.1.0 clients library instead of 0.10.0.1. > https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.10/pom.xml#L40 > Apart from the master, also back-port the changes to 1.2.x for Kafka connector 10 and all the 1.x dependencies for Kafka connector 09. -- This message was sent by Atlassian JIRA (v6.4.14#64029)