flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6301) Flink KafkaConnector09 leaks memory on reading compressed messages due to a Kafka consumer bug
Date Thu, 20 Jul 2017 05:23:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16094192#comment-16094192
] 

ASF GitHub Bot commented on FLINK-6301:
---------------------------------------

Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4015
  
    I think the version bump is due to the fact that there are broken API behaviour since
moving to 0.10.2.1. See #4321 (I think there's duplicate work here and there).


> Flink KafkaConnector09 leaks memory on reading compressed messages due to a Kafka consumer
bug
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6301
>                 URL: https://issues.apache.org/jira/browse/FLINK-6301
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.2.0, 1.1.3, 1.1.4
>            Reporter: Rahul Yadav
>            Assignee: Rahul Yadav
>             Fix For: 1.2.2, 1.4.0
>
>         Attachments: jeprof.24611.1228.i1228.heap.svg, jeprof.24611.1695.i1695.heap.svg,
jeprof.24611.265.i265.heap.svg, jeprof.24611.3138.i3138.heap.svg, jeprof.24611.595.i595.heap.svg,
jeprof.24611.705.i705.heap.svg, jeprof.24611.81.i81.heap.svg, POSTFIX.jeprof.14880.1944.i1944.heap.svg,
POSTFIX.jeprof.14880.4129.i4129.heap.svg, POSTFIX.jeprof.14880.961.i961.heap.svg, POSTFIX.jeprof.14880.99.i99.heap.svg,
POSTFIX.jeprof.14880.9.i9.heap.svg
>
>
> Hi
> We are running Flink on a standalone cluster with 8 TaskManagers having 8 vCPUs and 8
slots each. Each host has 16 GB of RAM.
> In our jobs, 
> # We are consuming gzip compressed messages from Kafka using *KafkaConnector09* and use
*rocksDB* backend for checkpoint storage.
> # To debug the leak, we used *jemalloc and jprof* to profile the sources of malloc calls
from the java process and attached are the profiles generated at various stages of the job.
As we can see, apart from the os.malloc and rocksDB.allocateNewBlock, there are additional
malloc calls coming from inflate() method of java.util.zip.inflater. These calls are innocuous
as long as the inflater.end() method is called after it's use.
> # To look for sources of inflate() method, we used Btrace on the running process to dump
caller stack on the method call. Following is the stackTrace we got: 
> {code}
> java.util.zip.Inflater.inflate(Inflater.java)
> java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
> java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
> java.io.DataInputStream.readFully(DataInputStream.java:195)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:253)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.innerDone(MemoryRecords.java:282)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:233)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:563)
> org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69)
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139)
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136)
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:227)
> {code}
> The end() method on Inflater is called inside the close() method of *InflaterInputSteam*
(extended by *GZIPInputStream*) but looking through the Kafka consumer code, we found that
RecordsIterator is not closing the compressor stream after use and hence, causing the memory
leak:
> https://github.com/apache/kafka/blob/23c69d62a0cabf06c4db8b338f0ca824dc6d81a7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L210
> https://issues.apache.org/jira/browse/KAFKA-3937 was filed for this and the issue was
fixed in 0.10.1.0 but not back-ported to previous versions.
> So, I would assume that we have to two paths from here: 
> 1. Wait for the changes to be back-ported to 0.9.x Kafka consumer and then, update the
Kafka-clients dependency:
> https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.9/pom.xml#L40
> 2. Update the kafka-connector10 to use 0.10.1.0 clients library instead of 0.10.0.1.
> https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.10/pom.xml#L40
> Apart from the master, also back-port the changes to 1.2.x for Kafka connector 10 and
all the 1.x dependencies for Kafka connector 09.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message