flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Javier Lopez <javier.lo...@zalando.de>
Subject Re: Flink memory leak
Date Wed, 08 Nov 2017 13:57:56 GMT
Hi,

You don't need data. With data it will die faster. I tested as well with a
small data set, using the fromElements source, but it will take some time
to die. It's better with some data.

On 8 November 2017 at 14:54, Piotr Nowojski <piotr@data-artisans.com> wrote:

> Hi,
>
> Thanks for sharing this job.
>
> Do I need to feed some data to the Kafka to reproduce this issue with your
> script?
>
> Does this OOM issue also happen when you are not using the Kafka
> source/sink?
>
> Piotrek
>
> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lopez@zalando.de> wrote:
>
> Hi,
>
> This is the test flink job we created to trigger this leak
> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
> And this is the python script we are using to execute the job thousands of
> times to get the OOM problem https://gist.github.com/javieredo/
> 4825324d5d5f504e27ca6c004396a107
>
> The cluster we used for this has this configuration:
>
>    - Instance type: t2.large
>    - Number of workers: 2
>    - HeapMemory: 5500
>    - Number of task slots per node: 4
>    - TaskMangMemFraction: 0.5
>    - NumberOfNetworkBuffers: 2000
>
> We have tried several things, increasing the heap, reducing the heap, more
> memory fraction, changes this value in the taskmanager.sh
> "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.
>
> Thanks for your help.
>
> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <
> b20926247@cs.hacettepe.edu.tr> wrote:
>
>> On 2017-11-08 15:20, Piotr Nowojski wrote:
>>
>>> Hi Ebru and Javier,
>>>
>>> Yes, if you could share this example job it would be helpful.
>>>
>>> Ebru: could you explain in a little more details how does your Job(s)
>>> look like? Could you post some code? If you are just using maps and
>>> filters there shouldn’t be any network transfers involved, aside
>>> from Source and Sink functions.
>>>
>>> Piotrek
>>>
>>> On 8 Nov 2017, at 12:54, ebru <b20926247@cs.hacettepe.edu.tr> wrote:
>>>>
>>>> Hi Javier,
>>>>
>>>> It would be helpful if you share your test job with us.
>>>> Which configurations did you try?
>>>>
>>>> -Ebru
>>>>
>>>> On 8 Nov 2017, at 14:43, Javier Lopez <javier.lopez@zalando.de>
>>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> We have been facing a similar problem. We have tried some different
>>>> configurations, as proposed in other email thread by Flavio and
>>>> Kien, but it didn't work. We have a workaround similar to the one
>>>> that Flavio has, we restart the taskmanagers once they reach a
>>>> memory threshold. We created a small test to remove all of our
>>>> dependencies and leave only flink native libraries. This test reads
>>>> data from a Kafka topic and writes it back to another topic in
>>>> Kafka. We cancel the job and start another every 5 seconds. After
>>>> ~30 minutes of doing this process, the cluster reaches the OS memory
>>>> limit and dies.
>>>>
>>>> Currently, we have a test cluster with 8 workers and 8 task slots
>>>> per node. We have one job that uses 56 slots, and we cannot execute
>>>> that job 5 times in a row because the whole cluster dies. If you
>>>> want, we can publish our test job.
>>>>
>>>> Regards,
>>>>
>>>> On 8 November 2017 at 11:20, Aljoscha Krettek <aljoscha@apache.org>
>>>> wrote:
>>>>
>>>> @Nico & @Piotr Could you please have a look at this? You both
>>>> recently worked on the network stack and might be most familiar with
>>>> this.
>>>>
>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pompermaier@okkam.it>
>>>> wrote:
>>>>
>>>> We also have the same problem in production. At the moment the
>>>> solution is to restart the entire Flink cluster after every job..
>>>> We've tried to reproduce this problem with a test (see
>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we don't
>>>>
>>>> know whether the error produced by the test and the leak are
>>>> correlated..
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>> <b20926247@cs.hacettepe.edu.tr> wrote:
>>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>>> Do you use any windowing? If yes, could you please share that code?
>>>> If
>>>> there is no stateful operation at all, it's strange where the list
>>>> state instances are coming from.
>>>>
>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926247@cs.hacettepe.edu.tr>
>>>> wrote:
>>>> Hi Ufuk,
>>>>
>>>> We don’t explicitly define any state descriptor. We only use map
>>>> and filters
>>>> operator. We thought that gc handle clearing the flink’s internal
>>>> states.
>>>> So how can we manage the memory if it is always increasing?
>>>>
>>>> - Ebru
>>>>
>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <uce@apache.org> wrote:
>>>>
>>>> Hey Ebru, the memory usage might be increasing as long as a job is
>>>> running.
>>>> This is expected (also in the case of multiple running jobs). The
>>>> screenshots are not helpful in that regard. :-(
>>>>
>>>> What kind of stateful operations are you using? Depending on your
>>>> use case,
>>>> you have to manually call `clear()` on the state instance in order
>>>> to
>>>> release the managed state.
>>>>
>>>> Best,
>>>>
>>>> Ufuk
>>>>
>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru
>>>> <b20926247@cs.hacettepe.edu.tr> wrote:
>>>>
>>>> Begin forwarded message:
>>>>
>>>> From: ebru <b20926247@cs.hacettepe.edu.tr>
>>>> Subject: Re: Flink memory leak
>>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>>> To: Ufuk Celebi <uce@apache.org>
>>>>
>>>> Hi Ufuk,
>>>>
>>>> There are there snapshots of htop output.
>>>> 1. snapshot is initial state.
>>>> 2. snapshot is after submitted one job.
>>>> 3. Snapshot is the output of the one job with 15000 EPS. And the
>>>> memory
>>>> usage is always increasing over time.
>>>>
>>>> <1.png><2.png><3.png>
>>>>
>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <uce@apache.org> wrote:
>>>>
>>>> Hey Ebru,
>>>>
>>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing
>>>> this.
>>>>
>>>> Since multiple jobs are running, it will be hard to understand to
>>>> which job the state descriptors from the heap snapshot belong to.
>>>> - Is it possible to isolate the problem and reproduce the behaviour
>>>> with only a single job?
>>>>
>>>> – Ufuk
>>>>
>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>> <b20926247@cs.hacettepe.edu.tr> wrote:
>>>>
>>>> Hi,
>>>>
>>>> We are using Flink 1.3.1 in production, we have one job manager and
>>>> 3 task
>>>> managers in standalone mode. Recently, we've noticed that we have
>>>> memory
>>>> related problems. We use docker container to serve Flink cluster. We
>>>> have
>>>> 300 slots and 20 jobs are running with parallelism of 10. Also the
>>>> job
>>>> count
>>>> may be change over time. Taskmanager memory usage always increases.
>>>> After
>>>> job cancelation this memory usage doesn't decrease. We've tried to
>>>> investigate the problem and we've got the task manager jvm heap
>>>> snapshot.
>>>> According to the jam heap analysis, possible memory leak was Flink
>>>> list
>>>> state descriptor. But we are not sure that is the cause of our
>>>> memory
>>>> problem. How can we solve the problem?
>>>>
>>>> We have two types of Flink job. One has no state full operator
>>>> contains only maps and filters and the other has time window with
>>>> count trigger.
>>>>
>>>  * We've analysed the jvm heaps again in different conditions. First
>>> we analysed the snapshot when no flink jobs running on cluster. (image
>>> 1)
>>> * Then, we analysed the jvm heap snapshot when the flink job that has
>>> no state full operator is running. And according to the results, leak
>>> suspect was NetworkBufferPool (image 2)
>>> *   Last analys, there were both two types of jobs running and leak
>>> suspect was again NetworkBufferPool. (image 3)
>>> In our system jobs are regularly cancelled and resubmitted so we
>>> noticed that when job is submitted some amount of memory allocated and
>>> after cancelation this allocated memory never freed. So over time
>>> memory usage is always increasing and exceeded the limits.
>>>
>>>
>>>>>
>>>
>>>
>>> Links:
>>> ------
>>> [1] https://issues.apache.org/jira/browse/FLINK-7845
>>>
>> Hi Piotr,
>>
>> There are two types of jobs.
>> In first, we use Kafka source and Kafka sink, there isn't any window
>> operator.
>> In second job, we use Kafka source, filesystem sink and elastic search
>> sink and window operator for buffering.
>>
>
>
>

Mime
View raw message