flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Javier Lopez <javier.lo...@zalando.de>
Subject Re: Flink memory leak
Date Wed, 08 Nov 2017 14:28:43 GMT
Yes, I tested with just printing the stream. But it could take a lot of
time to fail.

On Wednesday, 8 November 2017, Piotr Nowojski <piotr@data-artisans.com>
wrote:
> Thanks for quick answer.
> So it will also fail after some time with `fromElements` source instead
of Kafka, right?
> Did you try it also without a Kafka producer?
> Piotrek
>
> On 8 Nov 2017, at 14:57, Javier Lopez <javier.lopez@zalando.de> wrote:
> Hi,
> You don't need data. With data it will die faster. I tested as well with
a small data set, using the fromElements source, but it will take some time
to die. It's better with some data.
> On 8 November 2017 at 14:54, Piotr Nowojski <piotr@data-artisans.com>
wrote:
>>
>> Hi,
>> Thanks for sharing this job.
>> Do I need to feed some data to the Kafka to reproduce this issue with
your script?
>> Does this OOM issue also happen when you are not using the Kafka
source/sink?
>> Piotrek
>>
>> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lopez@zalando.de> wrote:
>> Hi,
>> This is the test flink job we created to trigger this leak
https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
>> And this is the python script we are using to execute the job thousands
of times to get the OOM problem
https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107
>> The cluster we used for this has this configuration:
>>
>> Instance type: t2.large
>> Number of workers: 2
>> HeapMemory: 5500
>> Number of task slots per node: 4
>> TaskMangMemFraction: 0.5
>> NumberOfNetworkBuffers: 2000
>>
>> We have tried several things, increasing the heap, reducing the heap,
more memory fraction, changes this value in the taskmanager.sh
"TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.
>> Thanks for your help.
>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <
b20926247@cs.hacettepe.edu.tr> wrote:
>>>
>>> On 2017-11-08 15:20, Piotr Nowojski wrote:
>>>>
>>>> Hi Ebru and Javier,
>>>>
>>>> Yes, if you could share this example job it would be helpful.
>>>>
>>>> Ebru: could you explain in a little more details how does your Job(s)
>>>> look like? Could you post some code? If you are just using maps and
>>>> filters there shouldn’t be any network transfers involved, aside
>>>> from Source and Sink functions.
>>>>
>>>> Piotrek
>>>>
>>>>> On 8 Nov 2017, at 12:54, ebru <b20926247@cs.hacettepe.edu.tr> wrote:
>>>>>
>>>>> Hi Javier,
>>>>>
>>>>> It would be helpful if you share your test job with us.
>>>>> Which configurations did you try?
>>>>>
>>>>> -Ebru
>>>>>
>>>>> On 8 Nov 2017, at 14:43, Javier Lopez <javier.lopez@zalando.de>
>>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> We have been facing a similar problem. We have tried some different
>>>>> configurations, as proposed in other email thread by Flavio and
>>>>> Kien, but it didn't work. We have a workaround similar to the one
>>>>> that Flavio has, we restart the taskmanagers once they reach a
>>>>> memory threshold. We created a small test to remove all of our
>>>>> dependencies and leave only flink native libraries. This test reads
>>>>> data from a Kafka topic and writes it back to another topic in
>>>>> Kafka. We cancel the job and start another every 5 seconds. After
>>>>> ~30 minutes of doing this process, the cluster reaches the OS memory
>>>>> limit and dies.
>>>>>
>>>>> Currently, we have a test cluster with 8 workers and 8 task slots
>>>>> per node. We have one job that uses 56 slots, and we cannot execute
>>>>> that job 5 times in a row because the whole cluster dies. If you
>>>>> want, we can publish our test job.
>>>>>
>>>>> Regards,
>>>>>
>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek <aljoscha@apache.org>
>>>>> wrote:
>>>>>
>>>>> @Nico & @Piotr Could you please have a look at this? You both
>>>>> recently worked on the network stack and might be most familiar with
>>>>> this.
>>>>>
>>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pompermaier@okkam.it>
>>>>> wrote:
>>>>>
>>>>> We also have the same problem in production. At the moment the
>>>>> solution is to restart the entire Flink cluster after every job..
>>>>> We've tried to reproduce this problem with a test (see
>>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we don't
>>>>> know whether the error produced by the test and the leak are
>>>>> correlated..
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>>> <b20926247@cs.hacettepe.edu.tr> wrote:
>>>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>>>> Do you use any windowing? If yes, could you please share that code?
>>>>> If
>>>>> there is no stateful operation at all, it's strange where the list
>>>>> state instances are coming from.
>>>>>
>>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926247@cs.hacettepe.edu.tr>
>>>>> wrote:
>>>>> Hi Ufuk,
>>>>>
>>>>> We don’t explicitly define any state descriptor. We only use map
>>>>> and filters
>>>>> operator. We thought that gc handle clearing the flink’s internal
>>>>> states.
>>>>> So how can we manage the memory if it is always increasing?
>>>>>
>>>>> - Ebru
>>>>>
>>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <uce@apache.org> wrote:
>>>>>
>>>>> Hey Ebru, the memory usage might be increasing as long as a job is
>>>>> running.
>>>>> This is expected (also in the case of multiple running jobs). The
>>>>> screenshots are not helpful in that regard. :-(
>>>>>
>>>>> What kind of stateful operations are you using? Depending on your
>>>>> use case,
>>>>> you have to manually call `clear()` on the state instance in order
>>>>> to
>>>>> release the managed state.
>>>>>
>>>>> Best,
>>>>>
>>>>> Ufuk
>>>>>
>>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru
>>>>> <b20926247@cs.hacettepe.edu.tr> wrote:
>>>>>
>>>>> Begin forwarded message:
>>>>>
>>>>> From: ebru <b20926247@cs.hacettepe.edu.tr>
>>>>> Subject: Re: Flink memory leak
>>>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>>>> To: Ufuk Celebi <uce@apache.org>
>>>>>
>>>>> Hi Ufuk,
>>>>>
>>>>> There are there snapshots of htop output.
>>>>> 1. snapshot is initial state.
>>>>> 2. snapshot is after submitted one job.
>>>>> 3. Snapshot is the output of the one job with 15000 EPS. And the
>>>>> memory
>>>>> usage is always increasing over time.
>>>>>
>>>>> <1.png><2.png><3.png>
>>>>>
>>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <uce@apache.org> wrote:
>>>>>
>>>>> Hey Ebru,
>>>>>
>>>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing
>>>>> this.
>>>>>
>>>>> Since multiple jobs are running, it will be hard to understand to
>>>>> which job the state descriptors from the heap snapshot belong to.
>>>>> - Is it possible to isolate the problem and reproduce the behaviour
>>>>> with only a single job?
>>>>>
>>>>> – Ufuk
>>>>>
>>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>>> <b20926247@cs.hacettepe.edu.tr> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> We are using Flink 1.3.1 in production, we have one job manager and
>>>>> 3 task
>>>>> managers in standalone mode. Recently, we've noticed that we have
>>>>> memory
>>>>> related problems. We use docker container to serve Flink cluster. We
>>>>> have
>>>>> 300 slots and 20 jobs are running with parallelism of 10. Also the
>>>>> job
>>>>> count
>>>>> may be change over time. Taskmanager memory usage always increases.
>>>>> After
>>>>> job cancelation this memory usage doesn't decrease. We've tried to
>>>>> investigate the problem and we've got the task manager jvm heap
>>>>> snapshot.
>>>>> According to the jam heap analysis, possible memory leak was Flink
>>>>> list
>>>>> state descriptor. But we are not sure that is the cause of our
>>>>> memory
>>>>> problem. How can we solve the problem?
>>>>>
>>>>> We have two types of Flink job. One has no state full operator
>>>>> contains only maps and filters and the other has time window with
>>>>> count trigger.
>>>>
>>>>  * We've analysed the jvm heaps again in different conditions. First
>>>> we analysed the snapshot when no flink jobs running on cluster. (image
>>>> 1)
>>>> * Then, we analysed the jvm heap snapshot when the flink job that has
>>>> no state full operator is running. And according to the results, leak
>>>> suspect was NetworkBufferPool (image 2)
>>>> *   Last analys, there were both two types of jobs running and leak
>>>> suspect was again NetworkBufferPool. (image 3)
>>>> In our system jobs are regularly cancelled and resubmitted so we
>>>> noticed that when job is submitted some amount of memory allocated and
>>>> after cancelation this allocated memory never freed. So over time
>>>> memory usage is always increasing and exceeded the limits.
>>>>
>>>>>>
>>>>
>>>>
>>>>
>>>> Links:
>>>> ------
>>>> [1] https://issues.apache.org/jira/browse/FLINK-7845
>>>
>>> Hi Piotr,
>>>
>>> There are two types of jobs.
>>> In first, we use Kafka source and Kafka sink, there isn't any window
operator.
>>> In second job, we use Kafka source, filesystem sink and elastic search
sink and window operator for buffering.
>>
>>
>
>
>

Mime
View raw message