kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sönke Liebau <soenke.lie...@opencore.com.INVALID>
Subject Re: Apache Kafka Memory Leakage???
Date Tue, 05 Mar 2019 10:58:44 GMT
Hi Syed,

the next step would be for someone else to be able to reproduce the issue.

If you could give me the values for the input parameters that your code
runs with (as per my last mail) then I am happy to run this again and take
another look at memory consumption.
I'll then upload the exact code I used to github, maybe you can run that in
your environment and provide a jconsole screenshot of memory consumption
over time, then we can compare those patterns.

Also, could you please upload the image from your last mail to imgur or a
similar service, it seems to have been lost in the mailing list.

Best regards,
Sönke

On Tue, Mar 5, 2019 at 11:48 AM Syed Mudassir Ahmed <
syed.mudassir@gaianconsultants.com> wrote:

> Sonke,
>   I am not blaming apache-kafka for the tickets raised by our customers.
> I am saying there could be an issue in kafka-clients library causing
> resource/memory leak.  If that issue is resolved, I can resolve my tickets
> as well automatically.  I don't find any issue with the snaplogic code.
>   Since I am in touch with developers of kafka-clients thru this email, I
> am looking forward to contribute as much as I can to betterize the
> kafka-clients library.
>   What the steps next to confirm its a bug in kafka-clients?  And if its a
> bug whats the process to get it resolved?
>
> Thanks,
>
>
>
> On Tue, Mar 5, 2019 at 2:43 PM Sönke Liebau
> <soenke.liebau@opencore.com.invalid> wrote:
>
>> Hi Syed,
>>
>> Apache Kafka is an open source software that comes as is without any
>> support attached to it. It may well be that this is a bug in the Kafka
>> client library, though tbh I doubt that from what my tests have shown and
>> since I think someone else would have noticed this as well.
>> Even if this is a bug though, there is no obligation on anyone to fix
>> this. Any bugs your customer raised with you are between you and them and
>> nothing to do with Apache Kafka.
>>
>> While I am happy to assist you with this I, like most people on this
>> list, do this in my spare time as well, which means that my time to spend
>> on this is limited.
>>
>> That being said, could you please host the image externally somewhere
>> (imgur or something similar), it doesn't appear to have gone through the
>> list.
>>
>> What input parameters are you using for isSuggest, messageCount and
>> synccommit when you run the code?
>>
>> Best regards,
>> Sönke
>>
>>
>>
>>
>> On Tue, Mar 5, 2019 at 9:14 AM Syed Mudassir Ahmed <
>> syed.mudassir@gaianconsultants.com> wrote:
>>
>>> Sonke,
>>>   This issue seems serious.  Customers raised bug with our product.  And
>>> I suspect the bug is in apache-kafka clients library.
>>>   I executed the kafka reader without any snaplogic-specific code.
>>> There were hardly about twenty messages in the topics.  The code consumed
>>> about 300MB of memory in about 2 hours.
>>>   Please find attached the screenshot.
>>>   Can we pls get on a call and arrive at the conclusion?  I still argue
>>> its a bug in the kafka-clients library.
>>>
>>> Thanks,
>>>
>>>
>>>
>>> On Mon, Mar 4, 2019 at 8:33 PM Sönke Liebau
>>> <soenke.liebau@opencore.com.invalid> wrote:
>>>
>>>> Hi Syed,
>>>>
>>>> and you are sure that this memory is actually allocated? I still have
>>>> my reservations about that metric to be honest. Is there any way to connect
>>>> to the process with for example jconsole and having a look at memory
>>>> consumption in there?
>>>> Or alternatively, since the code you have sent is not relying on
>>>> SnapLogic anymore, can you just run it as a standalone application and
>>>> check memory consumption?
>>>>
>>>> That code looks very similar to what I ran (without knowing your input
>>>> parameters for issuggest et. al of course) and for me memory consumption
>>>> stayed between 120mb and 200mb.
>>>>
>>>> Best regards,
>>>> Sönke
>>>>
>>>>
>>>> On Mon, Mar 4, 2019 at 1:44 PM Syed Mudassir Ahmed <
>>>> syed.mudassir@gaianconsultants.com> wrote:
>>>>
>>>>> Sonke,
>>>>>   thanks again.
>>>>>   Yes, I replaced the non-kafka code from our end with a simple Sysout
>>>>> statement as follows:
>>>>>
>>>>> do {
>>>>>     ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.of(timeout,
ChronoUnit.MILLIS));
>>>>>     for (final ConsumerRecord<byte[], byte[]> record : records)
{
>>>>>         if (!infiniteLoop && !oneTimeMode) {
>>>>>             --msgCount;
>>>>>             if (msgCount < 0) {
>>>>>                 break;
>>>>>             }
>>>>>         }
>>>>>         Debugger.doPrint("value read:<" + record.value() + ">");
>>>>>         /*outputViews.write(new BinaryOutput() {
>>>>>             @Override
>>>>>             public Document getHeader() {
>>>>>                 return generateHeader(record, oldHeader);
>>>>>             }
>>>>>
>>>>>             @Override
>>>>>             public void write(WritableByteChannel writeChannel) throws
IOException {
>>>>>                 try (OutputStream os = Channels.newOutputStream(writeChannel))
{
>>>>>                     os.write(record.value());
>>>>>                 }
>>>>>             }
>>>>>         });*/
>>>>>         //The offset to commit should be the next offset of the current
one,
>>>>>         // according to the API
>>>>>         offsets.put(new TopicPartition(record.topic(), record.partition()),
>>>>>                 new OffsetAndMetadata(record.offset() + 1));
>>>>>         //In suggest mode, we should not change the current offset
>>>>>         if (isSyncCommit && isSuggest) {
>>>>>             commitOffset(offsets);
>>>>>             offsets.clear();
>>>>>         }
>>>>>     }
>>>>> } while ((msgCount > 0 || infiniteLoop) && isRunning.get());
>>>>>
>>>>>
>>>>> *Note: *Debugger is a wrapper class that just writes the given string
to a local file using PrintStream's println() method.
>>>>>
>>>>> And I don't see any diff in the metrics.  I still see the huge amount
>>>>> of memory allocated.
>>>>>
>>>>> See the image attached.
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Mar 4, 2019 at 5:17 PM Sönke Liebau
>>>>> <soenke.liebau@opencore.com.invalid> wrote:
>>>>>
>>>>>> Hi Syed,
>>>>>>
>>>>>> let's keep it on the list for now so that everybody can participate
:)
>>>>>>
>>>>>> The different .poll() method was just an unrelated observation, the
>>>>>> main points of my mail were the question about whether this is the
>>>>>> correct metric you are looking at and replacing the payload of your
>>>>>> code with a println statement to remove non-Kafka code from your
>>>>>> program and make sure that the leak is not in there. Have you tried
>>>>>> that?
>>>>>>
>>>>>> Best regards,
>>>>>> Sönke
>>>>>>
>>>>>> On Mon, Mar 4, 2019 at 7:21 AM Syed Mudassir Ahmed
>>>>>> <syed.mudassir@gaianconsultants.com> wrote:
>>>>>> >
>>>>>> > Sonke,
>>>>>> >   Thanks so much for the reply.  I used the new version of
>>>>>> poll(Duration) method.  Still, I see memory issue.
>>>>>> >   Is there a way we can get on a one-one call and discuss this
>>>>>> pls?  Let me know your availability.  I can share zoom meeting link.
>>>>>> >
>>>>>> > Thanks,
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > On Sat, Mar 2, 2019 at 2:15 AM Sönke Liebau <
>>>>>> soenke.liebau@opencore.com.invalid> wrote:
>>>>>> >>
>>>>>> >> Hi Syed,
>>>>>> >>
>>>>>> >> from your screenshot I assume that you are using SnapLogic
to run
>>>>>> your
>>>>>> >> code (full disclosure: I do not have the faintest idea of
this
>>>>>> >> product!). I've just had a look at the docs and am a bit
confused
>>>>>> by
>>>>>> >> their explanation of the metric that you point out in your
image
>>>>>> >> "Memory Allocated". The docs say: "The Memory Allocated
reflects
>>>>>> the
>>>>>> >> number of bytes that were allocated by the Snap.  Note that
this
>>>>>> >> number does not reflect the amount of memory that was freed
and it
>>>>>> is
>>>>>> >> not the peak memory usage of the Snap.  So, it is not necessarily
a
>>>>>> >> metric that can be used to estimate the required size of
a Snaplex
>>>>>> >> node.  Rather, the number provides an insight into how much
memory
>>>>>> had
>>>>>> >> to be allocated to process all of the documents.  For example,
if
>>>>>> the
>>>>>> >> total allocated was 5MB and the Snap processed 32 documents,
then
>>>>>> the
>>>>>> >> Snap allocated roughly 164KB per document.  When combined
with the
>>>>>> >> other statistics, this number can help to identify the potential
>>>>>> >> causes of performance issues."
>>>>>> >> The part about not reflecting memory that was freed makes
me
>>>>>> somewhat
>>>>>> >> doubtful whether this actually reflects how much memory
the process
>>>>>> >> currently holds.  Can you give some more insight there?
>>>>>> >>
>>>>>> >> Apart from that, I just ran your code somewhat modified
to make it
>>>>>> >> work without dependencies for 2 hours and saw no unusual
memory
>>>>>> >> consumption, just a regular garbage collection sawtooth
pattern.
>>>>>> That
>>>>>> >> being said, I had to replace your actual processing with
a simple
>>>>>> >> println, so if there is a memory leak in there I would of
course
>>>>>> not
>>>>>> >> have noticed.
>>>>>> >> I've uploaded the code I ran [1] for reference. For further
>>>>>> analysis,
>>>>>> >> maybe you could run something similar with just a println
or noop
>>>>>> and
>>>>>> >> see if the symptoms persist, to localize the leak (if it
exists).
>>>>>> >>
>>>>>> >> Also, two random observations on your code:
>>>>>> >>
>>>>>> >> KafkaConsumer.poll(Long timeout) is deprecated, you should
consider
>>>>>> >> using the overloaded version with a Duration parameter instead.
>>>>>> >>
>>>>>> >> The comment at [2] seems to contradict the following code,
as the
>>>>>> >> offsets are only changed when in suggest mode. But as I
have no
>>>>>> idea
>>>>>> >> what suggest mode even is or all this means this observation
may be
>>>>>> >> miles of point :)
>>>>>> >>
>>>>>> >> I hope that helps a little.
>>>>>> >>
>>>>>> >> Best regards,
>>>>>> >> Sönke
>>>>>> >>
>>>>>> >> [1]
>>>>>> https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983
>>>>>> >> [2]
>>>>>> https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983#file-memoryleak-java-L86
>>>>>> >>
>>>>>> >>
>>>>>> >> On Fri, Mar 1, 2019 at 7:35 AM Syed Mudassir Ahmed
>>>>>> >> <syed.mudassir@gaianconsultants.com> wrote:
>>>>>> >> >
>>>>>> >> >
>>>>>> >> > Thanks,
>>>>>> >> >
>>>>>> >> >
>>>>>> >> >
>>>>>> >> > ---------- Forwarded message ---------
>>>>>> >> > From: Syed Mudassir Ahmed <syed.mudassir@gaianconsultants.com>
>>>>>> >> > Date: Tue, Feb 26, 2019 at 12:40 PM
>>>>>> >> > Subject: Apache Kafka Memory Leakage???
>>>>>> >> > To: <users@kafka.apache.org>
>>>>>> >> > Cc: Syed Mudassir Ahmed <syed.mudassir@gaianconsultants.com>
>>>>>> >> >
>>>>>> >> >
>>>>>> >> > Hi Team,
>>>>>> >> >   I have a java application based out of latest Apache
Kafka
>>>>>> version 2.1.1.
>>>>>> >> >   I have a consumer application that runs infinitely
to consume
>>>>>> messages whenever produced.
>>>>>> >> >   Sometimes there are no messages produced for hours.
 Still, I
>>>>>> see that the memory allocated to consumer program is drastically
increasing.
>>>>>> >> >   My code is as follows:
>>>>>> >> >
>>>>>> >> > AtomicBoolean isRunning = new AtomicBoolean(true);
>>>>>> >> >
>>>>>> >> > Properties kafkaProperties = new Properties();
>>>>>> >> >
>>>>>> >> >
>>>>>> kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>>> brokers);
>>>>>> >> >
>>>>>> >> > kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
groupID);
>>>>>> >> >
>>>>>> >> > kafkaProperties.put(ConsumerConfig.CLIENT_ID_CONFIG,
>>>>>> UUID.randomUUID().toString());
>>>>>> >> > kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
>>>>>> false);
>>>>>> >> > kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>>>>>> AUTO_OFFSET_RESET_EARLIEST);
>>>>>> >> > consumer = new KafkaConsumer<byte[], byte[]>(kafkaProperties,
>>>>>> keyDeserializer, valueDeserializer);
>>>>>> >> > if (topics != null) {
>>>>>> >> >     subscribeTopics(topics);
>>>>>> >> > }
>>>>>> >> >
>>>>>> >> >
>>>>>> >> >     boolean infiniteLoop = false;
>>>>>> >> >     boolean oneTimeMode = false;
>>>>>> >> >     int timeout = consumeTimeout;
>>>>>> >> >     if (isSuggest) {
>>>>>> >> >         //Configuration for suggest mode
>>>>>> >> >         oneTimeMode = true;
>>>>>> >> >         msgCount = 0;
>>>>>> >> >         timeout = DEFAULT_CONSUME_TIMEOUT_IN_MS;
>>>>>> >> >     } else if (msgCount < 0) {
>>>>>> >> >         infiniteLoop = true;
>>>>>> >> >     } else if (msgCount == 0) {
>>>>>> >> >         oneTimeMode = true;
>>>>>> >> >     }
>>>>>> >> >     Map<TopicPartition, OffsetAndMetadata> offsets
=
>>>>>> Maps.newHashMap();
>>>>>> >> >     do {
>>>>>> >> >             ConsumerRecords<byte[], byte[]> records
=
>>>>>> consumer.poll(timeout);
>>>>>> >> >             for (final ConsumerRecord<byte[], byte[]>
record :
>>>>>> records) {
>>>>>> >> >                 if (!infiniteLoop && !oneTimeMode)
{
>>>>>> >> >                     --msgCount;
>>>>>> >> >                     if (msgCount < 0) {
>>>>>> >> >                         break;
>>>>>> >> >                     }
>>>>>> >> >                 }
>>>>>> >> >                 outputViews.write(new BinaryOutput()
{
>>>>>> >> >                     @Override
>>>>>> >> >                     public Document getHeader() {
>>>>>> >> >                         return generateHeader(record,
oldHeader);
>>>>>> >> >                     }
>>>>>> >> >
>>>>>> >> >                     @Override
>>>>>> >> >                     public void write(WritableByteChannel
>>>>>> writeChannel) throws IOException {
>>>>>> >> >                         try (OutputStream os =
>>>>>> Channels.newOutputStream(writeChannel)) {
>>>>>> >> >                             os.write(record.value());
>>>>>> >> >                         }
>>>>>> >> >                     }
>>>>>> >> >                 });
>>>>>> >> >                 //The offset to commit should be the
next offset
>>>>>> of the current one,
>>>>>> >> >                 // according to the API
>>>>>> >> >                 offsets.put(new TopicPartition(record.topic(),
>>>>>> record.partition()),
>>>>>> >> >                         new OffsetAndMetadata(record.offset()
+
>>>>>> 1));
>>>>>> >> >                 //In suggest mode, we should not change
the
>>>>>> current offset
>>>>>> >> >                 if (isSyncCommit && isSuggest)
{
>>>>>> >> >                     commitOffset(offsets);
>>>>>> >> >                     offsets.clear();
>>>>>> >> >                 }
>>>>>> >> >             }
>>>>>> >> >      } while ((msgCount > 0 || infiniteLoop) &&
isRunning.get());
>>>>>> >> >
>>>>>> >> >
>>>>>> >> > See the screenshot below.  In about nineteen hours,
it just
>>>>>> consumed 5 messages but the memory allocated is 1.6GB.
>>>>>> >> >
>>>>>> >> >
>>>>>> >> > Any clues on how to get rid of memory issue?  Anything
I need to
>>>>>> do in the program or is it a bug in the kafka library?
>>>>>> >> >
>>>>>> >> > Please rever ASAP.
>>>>>> >> >
>>>>>> >> >
>>>>>> >> > Thanks,
>>>>>> >> >
>>>>>> >>
>>>>>> >>
>>>>>> >> --
>>>>>> >> Sönke Liebau
>>>>>> >> Partner
>>>>>> >> Tel. +49 179 7940878
>>>>>> >> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880
Wedel -
>>>>>> Germany
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sönke Liebau
>>>>>> Partner
>>>>>> Tel. +49 179 7940878
>>>>>> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel
- Germany
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Sönke Liebau
>>>> Partner
>>>> Tel. +49 179 7940878
>>>> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>>>>
>>>
>>
>> --
>> Sönke Liebau
>> Partner
>> Tel. +49 179 7940878
>> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>>
>

-- 
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message