Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8BAAF200D3C for ; Tue, 14 Nov 2017 17:29:40 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8A45D160BF4; Tue, 14 Nov 2017 16:29:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5EB441609EF for ; Tue, 14 Nov 2017 17:29:38 +0100 (CET) Received: (qmail 74466 invoked by uid 500); 14 Nov 2017 16:29:37 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 74456 invoked by uid 99); 14 Nov 2017 16:29:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Nov 2017 16:29:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 19ED8C14F8 for ; Tue, 14 Nov 2017 16:29:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.799 X-Spam-Level: X-Spam-Status: No, score=-0.799 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, HTML_OBFUSCATE_05_10=0.001, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-2.8, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id mMoL08RFzVmu for ; Tue, 14 Nov 2017 16:29:32 +0000 (UTC) Received: from mail-wr0-f174.google.com (mail-wr0-f174.google.com [209.85.128.174]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id EE94D5FD1B for ; Tue, 14 Nov 2017 16:29:31 +0000 (UTC) Received: by mail-wr0-f174.google.com with SMTP id j15so17998932wre.8 for ; Tue, 14 Nov 2017 08:29:31 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=from:message-id:mime-version:subject:date:in-reply-to:cc:to :references; bh=bdS754UQFiRcjGxmG1Svd5le9dzykc29N+7fCKWWlSI=; b=t600J1c91LL9FrHzNYzCitJmd7ch2SBn49LOp0WtLU/NQgq+H5JVnYMxyDKgL5mxnH v5+uEfOAqFjTx+R9S6tR5LZQvxH6rhSYRbhA7Cnu3eiYwXVtXhLS6a+V1XDCvKOwZNil mNCyFCth3ojVImvtiYU2qeb3TQdXjOnNrCHDmFgHJR8druw09aaGPgHjwQfXZHs8PFdO 2wLzccbvpaMP9U03/YZQxLjICvOjdSkcDkbPpi7PfqAa4xsrP9/wHRgQ01Mmov/wHGo4 sIOD4E4E47B2JyBw/XPjG05p6oLnHnSr0eJWM0DlSuOe7HdkK7U31HlV2lENAy1hcgj7 MBtQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:message-id:mime-version:subject:date :in-reply-to:cc:to:references; bh=bdS754UQFiRcjGxmG1Svd5le9dzykc29N+7fCKWWlSI=; b=QwituAu8ubxQ9EbY1+yHuYpshKh8jo6GW/lAq8Jss36jtciwtlIUCmQ6njyR3oA6eD yvSXEiazcvxgXDFwuI632E6pYN4fLUyKR65FDlpqa8hQa/aZ7x7OGXTltf4dSlectGQW KgE52IVPVnOAYtOYPoAhIq2/TkcZTwNQxrB+ooVDi+Dwya6GxyBwYqCab1NOXZOnGf2a x1Ly6CfRvs6dkC2FDd5Fp9IAFvdNhFCUkLpSb0fHcQl6xuDbJDPnJKrH2LlurEB7kstg vUIW3P4t9QYPBdf70DLj5DXMH/vSa2xMQhulcF6M4D43w4xJdhFOrc/KnDEI3ywP+jyA ReQg== X-Gm-Message-State: AJaThX5Nmo60z7eR77SEOk2qr15F7Yk1pImLeZxVfHuAl2ojc1R93IqV o4iRQMoimUtqYGnMRJa4RP0xCg== X-Google-Smtp-Source: AGs4zMZm0HCZKRi9OP16KRZQBFYbAvDwAPwEW1TjV4qk71hLUGajYQ/Jf5M1SjzLO/piavr81pWZ3w== X-Received: by 10.223.190.14 with SMTP id n14mr11778823wrh.46.1510676970705; Tue, 14 Nov 2017 08:29:30 -0800 (PST) Received: from [192.168.1.183] (p4FD2785B.dip0.t-ipconnect.de. [79.210.120.91]) by smtp.gmail.com with ESMTPSA id y15sm25684218wrc.96.2017.11.14.08.29.29 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Tue, 14 Nov 2017 08:29:30 -0800 (PST) From: Piotr Nowojski Message-Id: <6B9DE3BA-AD1A-4FF2-8A08-A6EC69A5E2AD@data-artisans.com> Content-Type: multipart/alternative; boundary="Apple-Mail=_19B75350-BBB9-4565-85CC-8C27952F5BAF" Mime-Version: 1.0 (Mac OS X Mail 11.1 \(3445.4.7\)) Subject: Re: Flink memory leak Date: Tue, 14 Nov 2017 17:29:28 +0100 In-Reply-To: Cc: =?utf-8?B?w4dFVMSwTktBWUEgRUJSVSDDh0VUxLBOS0FZQSBFQlJV?= , Javier Lopez , Aljoscha Krettek , Nico Kruber , Ufuk Celebi , user To: Flavio Pompermaier References: <58D3AB1E-821D-46A2-8F04-8BB708037C2B@cs.hacettepe.edu.tr> <30C290C1-3FFD-479D-A926-861B110638BD@cs.hacettepe.edu.tr> <8c5facf8eda9ed6197c9e1c36099f8bb@cs.hacettepe.edu.tr> <14DD60D2-6E14-4DF3-836C-0A712F68B8C2@data-artisans.com> <95582EE2-9F90-43B8-8B3A-71A027A5DB1B@data-artisans.com> <972D0323-A628-45C6-AD1B-654EBD95AD96@data-artisans.com> <0019c4f2caf07e0fbde7a73ab7361002@cs.hacettepe.edu.tr> <9021B6B4-8A8D-4DBF-9C24-AC1F06956A4C@data-artisans.com> <57a785fab1f98ee12060d42adad186ff@cs.hacettepe.edu.tr> <633C1FF0-9221-4689-9401-B3708DA305F4@data-artisans.com> <3edbdfc7da76ff2c2deecc4280f2168a@cs.hacettepe.edu.tr> <9E44A461-061E-4474-AE1E-9E9D70DA23BD@data-artisans.com> X-Mailer: Apple Mail (2.3445.4.7) archived-at: Tue, 14 Nov 2017 16:29:40 -0000 --Apple-Mail=_19B75350-BBB9-4565-85CC-8C27952F5BAF Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Best would be to analyse memory usage via some profiler. What I have = done was: 1. Run your scenario on the test cluster until memory consumption goes = up 2. Stop submitting new jobs, cancel or running jobs 3. Manually triggered GC couple of times via jconsole (other tools can = do that as well) 4. Analyse memory consumption via: A) Oracle=E2=80=99s Mission Control (Java Mission Control, jmc) - analyse memory consumption and check which memory pool is growing = (OldGen heap? Metaspace? Code Cache? Non heap?) - run flight record with checked all memory options - check which objects were using a lot of memory=20 B) VisualVM - take heap dump and analyse what is using up all of this memory C) jconsole - this can tell you memory pool status of you JVMs, but will not tell = you what objects are actually exhausting the pools Couple of remarks: - because of GC memory usage can goes up and down. Important is the = trend of local minimums measured just after manually triggered GC - you might have to repeat steps 2, 3, 4 to actually see what has = increased between submitting the jobs - by default network buffers are using 10% of heap space in byte[], so = you can ignore those - this JDK bug that I have reproduced was visible by huge memory = consumption of multiple char[] and ConcurrentHashMap$Node instances =20 Piotrek > On 14 Nov 2017, at 16:08, Flavio Pompermaier = wrote: >=20 > What should we do to confirm it? Do you have any github repo start = from? >=20 > On Tue, Nov 14, 2017 at 4:02 PM, Piotr Nowojski = > wrote: > Ebru, Javier, Flavio: >=20 > I tried to reproduce memory leak by submitting a job, that was = generating classes with random names. And indeed I have found one. = Memory was accumulating in `char[]` instances that belonged to = `java.lang.ClassLoader#parallelLockMap`. OldGen memory pool was growing = in size up to the point I got: >=20 > java.lang.OutOfMemoryError: Java heap space >=20 > This seems like an old known =E2=80=9Cfeature=E2=80=9D of JDK: > https://bugs.openjdk.java.net/browse/JDK-8037342 = >=20 > Can any of you confirm that this is the issue that you are = experiencing? If not, I would really need more help/information from you = to track this down. >=20 > Piotrek >=20 >> On 10 Nov 2017, at 15:12, =C3=87ET=C4=B0NKAYA EBRU =C3=87ET=C4=B0NKAYA = EBRU > wrote: >>=20 >> On 2017-11-10 13:14, Piotr Nowojski wrote: >>> jobmanager1.log and taskmanager2.log are the same. Can you also = submit >>> files containing std output? >>> Piotrek >>>> On 10 Nov 2017, at 09:35, =C3=87ET=C4=B0NKAYA EBRU =C3=87ET=C4=B0NKAY= A EBRU > wrote: >>>> On 2017-11-10 11:04, Piotr Nowojski wrote: >>>>> Hi, >>>>> Thanks for the logs, however I do not see before mentioned = exceptions >>>>> in it. It ends with java.lang.InterruptedException >>>>> Is it the correct log file? Also, could you attach the std output = file >>>>> of the failing TaskManager? >>>>> Piotrek >>>>>> On 10 Nov 2017, at 08:42, =C3=87ET=C4=B0NKAYA EBRU =C3=87ET=C4=B0NK= AYA EBRU > wrote: >>>>>> On 2017-11-09 20:08, Piotr Nowojski wrote: >>>>>>> Hi, >>>>>>> Could you attach full logs from those task managers? At first = glance I >>>>>>> don=E2=80=99t see a connection between those exceptions and any = memory issue >>>>>>> that you might had. It looks like a dependency issue in one = (some? >>>>>>> All?) of your jobs. >>>>>>> Did you build your jars with -Pbuild-jar profile as described = here: >>>>>>> = https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/jav= a_api_quickstart.html#build-project = >>>>>>> ? >>>>>>> If that doesn=E2=80=99t help. Can you binary search which job is = causing the >>>>>>> problem? There might be some Flink incompatibility between = different >>>>>>> versions and rebuilding a job=E2=80=99s jar with a version = matching to the >>>>>>> cluster version might help. >>>>>>> Piotrek >>>>>>>> On 9 Nov 2017, at 17:36, =C3=87ET=C4=B0NKAYA EBRU =C3=87ET=C4=B0N= KAYA EBRU >>>>>>>> > wrote: >>>>>>>> On 2017-11-08 18:30, Piotr Nowojski wrote: >>>>>>>> Btw, Ebru: >>>>>>>> I don=E2=80=99t agree that the main suspect is = NetworkBufferPool. On your >>>>>>>> screenshots it=E2=80=99s memory consumption was reasonable and = stable: >>>>>>>> 596MB >>>>>>>> -> 602MB -> 597MB. >>>>>>>> PoolThreadCache memory usage ~120MB is also reasonable. >>>>>>>> Do you experience any problems, like Out Of Memory >>>>>>>> errors/crashes/long >>>>>>>> GC pauses? Or just JVM process is using more memory over time? = You >>>>>>>> are >>>>>>>> aware that JVM doesn=E2=80=99t like to release memory back to = OS once it >>>>>>>> was >>>>>>>> used? So increasing memory usage until hitting some limit (for >>>>>>>> example >>>>>>>> JVM max heap size) is expected behaviour. >>>>>>>> Piotrek >>>>>>>> On 8 Nov 2017, at 15:48, Piotr Nowojski = > >>>>>>>> wrote: >>>>>>>> I don=E2=80=99t know if this is relevant to this issue, but I = was >>>>>>>> constantly getting failures trying to reproduce this leak using = your >>>>>>>> Job, because you were using non deterministic getKey function: >>>>>>>> @Override >>>>>>>> public Integer getKey(Integer event) { >>>>>>>> Random randomGen =3D new Random((new Date()).getTime()); >>>>>>>> return randomGen.nextInt() % 8; >>>>>>>> } >>>>>>>> And quoting Java doc of KeySelector: >>>>>>>> "If invoked multiple times on the same object, the returned key = must >>>>>>>> be the same.=E2=80=9D >>>>>>>> I=E2=80=99m trying to reproduce this issue with following job: >>>>>>>> = https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 = >>>>>>>> Where IntegerSource is just an infinite source, DisardingSink = is >>>>>>>> well just discarding incoming data. I=E2=80=99m cancelling the = job every 5 >>>>>>>> seconds and so far (after ~15 minutes) my memory consumption is >>>>>>>> stable, well below maximum java heap size. >>>>>>>> Piotrek >>>>>>>> On 8 Nov 2017, at 15:28, Javier Lopez > >>>>>>>> wrote: >>>>>>>> Yes, I tested with just printing the stream. But it could take = a >>>>>>>> lot of time to fail. >>>>>>>> On Wednesday, 8 November 2017, Piotr Nowojski >>>>>>>> > = wrote: >>>>>>>> Thanks for quick answer. >>>>>>>> So it will also fail after some time with `fromElements` source >>>>>>>> instead of Kafka, right? >>>>>>>> Did you try it also without a Kafka producer? >>>>>>>> Piotrek >>>>>>>> On 8 Nov 2017, at 14:57, Javier Lopez > >>>>>>>> wrote: >>>>>>>> Hi, >>>>>>>> You don't need data. With data it will die faster. I tested as >>>>>>>> well with a small data set, using the fromElements source, but = it >>>>>>>> will take some time to die. It's better with some data. >>>>>>>> On 8 November 2017 at 14:54, Piotr Nowojski >>>>>>>> > = wrote: >>>>>>>> Hi, >>>>>>>> Thanks for sharing this job. >>>>>>>> Do I need to feed some data to the Kafka to reproduce this >>>>>>> issue with your script? >>>>>>>>> Does this OOM issue also happen when you are not using the >>>>>>> Kafka source/sink? >>>>>>>>> Piotrek >>>>>>>>> On 8 Nov 2017, at 14:08, Javier Lopez > >>>>>>> wrote: >>>>>>>>> Hi, >>>>>>>>> This is the test flink job we created to trigger this leak >>>>>>> = https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 = >>>>>>>>> And this is the python script we are using to execute the job >>>>>>> thousands of times to get the OOM problem >>>>>>> = https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 = >>>>>>>>> The cluster we used for this has this configuration: >>>>>>>>> Instance type: t2.large >>>>>>>>> Number of workers: 2 >>>>>>>>> HeapMemory: 5500 >>>>>>>>> Number of task slots per node: 4 >>>>>>>>> TaskMangMemFraction: 0.5 >>>>>>>>> NumberOfNetworkBuffers: 2000 >>>>>>>>> We have tried several things, increasing the heap, reducing = the >>>>>>> heap, more memory fraction, changes this value in the >>>>>>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE=3D"2G"; and nothing seems to >>>>>>> work. >>>>>>>>> Thanks for your help. >>>>>>>>> On 8 November 2017 at 13:26, =C3=87ET=C4=B0NKAYA EBRU = =C3=87ET=C4=B0NKAYA EBRU >>>>>>> > wrote: >>>>>>>> On 2017-11-08 15:20, Piotr Nowojski wrote: >>>>>>>> Hi Ebru and Javier, >>>>>>>> Yes, if you could share this example job it would be helpful. >>>>>>>> Ebru: could you explain in a little more details how does >>>>>>> your Job(s) >>>>>>>> look like? Could you post some code? If you are just using >>>>>>> maps and >>>>>>>> filters there shouldn=E2=80=99t be any network transfers = involved, >>>>>>> aside >>>>>>>> from Source and Sink functions. >>>>>>>> Piotrek >>>>>>>> On 8 Nov 2017, at 12:54, ebru >>>>>>> > wrote: >>>>>>>> Hi Javier, >>>>>>>> It would be helpful if you share your test job with us. >>>>>>>> Which configurations did you try? >>>>>>>> -Ebru >>>>>>>> On 8 Nov 2017, at 14:43, Javier Lopez >>>>>>> > >>>>>>>> wrote: >>>>>>>> Hi, >>>>>>>> We have been facing a similar problem. We have tried some >>>>>>> different >>>>>>>> configurations, as proposed in other email thread by Flavio >>>>>>> and >>>>>>>> Kien, but it didn't work. We have a workaround similar to >>>>>>> the one >>>>>>>> that Flavio has, we restart the taskmanagers once they reach >>>>>>> a >>>>>>>> memory threshold. We created a small test to remove all of >>>>>>> our >>>>>>>> dependencies and leave only flink native libraries. This >>>>>>> test reads >>>>>>>> data from a Kafka topic and writes it back to another topic >>>>>>> in >>>>>>>> Kafka. We cancel the job and start another every 5 seconds. >>>>>>> After >>>>>>>> ~30 minutes of doing this process, the cluster reaches the >>>>>>> OS memory >>>>>>>> limit and dies. >>>>>>>> Currently, we have a test cluster with 8 workers and 8 task >>>>>>> slots >>>>>>>> per node. We have one job that uses 56 slots, and we cannot >>>>>>> execute >>>>>>>> that job 5 times in a row because the whole cluster dies. If >>>>>>> you >>>>>>>> want, we can publish our test job. >>>>>>>> Regards, >>>>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek >>>>>>> > >>>>>>>> wrote: >>>>>>>> @Nico & @Piotr Could you please have a look at this? You >>>>>>> both >>>>>>>> recently worked on the network stack and might be most >>>>>>> familiar with >>>>>>>> this. >>>>>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier >>>>>>> > >>>>>>>> wrote: >>>>>>>> We also have the same problem in production. At the moment >>>>>>> the >>>>>>>> solution is to restart the entire Flink cluster after every >>>>>>> job.. >>>>>>>> We've tried to reproduce this problem with a test (see >>>>>>>> https://issues.apache.org/jira/browse/FLINK-7845 = [1]) but we >>>>>>> don't >>>>>>>> know whether the error produced by the test and the leak are >>>>>>>> correlated.. >>>>>>>> Best, >>>>>>>> Flavio >>>>>>>> On Wed, Nov 8, 2017 at 9:51 AM, =C3=87ET=C4=B0NKAYA EBRU = =C3=87ET=C4=B0NKAYA >>>>>>> EBRU >>>>>>>> > wrote: >>>>>>>> On 2017-11-07 16:53, Ufuk Celebi wrote: >>>>>>>> Do you use any windowing? If yes, could you please share >>>>>>> that code? >>>>>>>> If >>>>>>>> there is no stateful operation at all, it's strange where >>>>>>> the list >>>>>>>> state instances are coming from. >>>>>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru >>>>>>> > >>>>>>>> wrote: >>>>>>>> Hi Ufuk, >>>>>>>> We don=E2=80=99t explicitly define any state descriptor. We = only >>>>>>> use map >>>>>>>> and filters >>>>>>>> operator. We thought that gc handle clearing the flink=E2=80=99s >>>>>>> internal >>>>>>>> states. >>>>>>>> So how can we manage the memory if it is always increasing? >>>>>>>> - Ebru >>>>>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi > wrote: >>>>>>>> Hey Ebru, the memory usage might be increasing as long as a >>>>>>> job is >>>>>>>> running. >>>>>>>> This is expected (also in the case of multiple running >>>>>>> jobs). The >>>>>>>> screenshots are not helpful in that regard. :-( >>>>>>>> What kind of stateful operations are you using? Depending on >>>>>>> your >>>>>>>> use case, >>>>>>>> you have to manually call `clear()` on the state instance in >>>>>>> order >>>>>>>> to >>>>>>>> release the managed state. >>>>>>>> Best, >>>>>>>> Ufuk >>>>>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >>>>>>>> > wrote: >>>>>>>> Begin forwarded message: >>>>>>>> From: ebru > >>>>>>>> Subject: Re: Flink memory leak >>>>>>>> Date: 7 November 2017 at 14:09:17 GMT+3 >>>>>>>> To: Ufuk Celebi > >>>>>>>> Hi Ufuk, >>>>>>>> There are there snapshots of htop output. >>>>>>>> 1. snapshot is initial state. >>>>>>>> 2. snapshot is after submitted one job. >>>>>>>> 3. Snapshot is the output of the one job with 15000 EPS. And >>>>>>> the >>>>>>>> memory >>>>>>>> usage is always increasing over time. >>>>>>>> <1.png><2.png><3.png> >>>>>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi > wrote: >>>>>>>> Hey Ebru, >>>>>>>> let me pull in Aljoscha (CC'd) who might have an idea what's >>>>>>> causing >>>>>>>> this. >>>>>>>> Since multiple jobs are running, it will be hard to >>>>>>> understand to >>>>>>>> which job the state descriptors from the heap snapshot >>>>>>> belong to. >>>>>>>> - Is it possible to isolate the problem and reproduce the >>>>>>> behaviour >>>>>>>> with only a single job? >>>>>>>> =E2=80=93 Ufuk >>>>>>>> On Tue, Nov 7, 2017 at 10:27 AM, =C3=87ET=C4=B0NKAYA EBRU >>>>>>> =C3=87ET=C4=B0NKAYA EBRU >>>>>>>> > wrote: >>>>>>>> Hi, >>>>>>>> We are using Flink 1.3.1 in production, we have one job >>>>>>> manager and >>>>>>>> 3 task >>>>>>>> managers in standalone mode. Recently, we've noticed that we >>>>>>> have >>>>>>>> memory >>>>>>>> related problems. We use docker container to serve Flink >>>>>>> cluster. We >>>>>>>> have >>>>>>>> 300 slots and 20 jobs are running with parallelism of 10. >>>>>>> Also the >>>>>>>> job >>>>>>>> count >>>>>>>> may be change over time. Taskmanager memory usage always >>>>>>> increases. >>>>>>>> After >>>>>>>> job cancelation this memory usage doesn't decrease. We've >>>>>>> tried to >>>>>>>> investigate the problem and we've got the task manager jvm >>>>>>> heap >>>>>>>> snapshot. >>>>>>>> According to the jam heap analysis, possible memory leak was >>>>>>> Flink >>>>>>>> list >>>>>>>> state descriptor. But we are not sure that is the cause of >>>>>>> our >>>>>>>> memory >>>>>>>> problem. How can we solve the problem? >>>>>>>> We have two types of Flink job. One has no state full >>>>>>> operator >>>>>>>> contains only maps and filters and the other has time window >>>>>>> with >>>>>>>> count trigger. >>>>>>>> * We've analysed the jvm heaps again in different >>>>>>> conditions. First >>>>>>>> we analysed the snapshot when no flink jobs running on >>>>>>> cluster. (image >>>>>>>> 1) >>>>>>>> * Then, we analysed the jvm heap snapshot when the flink job >>>>>>> that has >>>>>>>> no state full operator is running. And according to the >>>>>>> results, leak >>>>>>>> suspect was NetworkBufferPool (image 2) >>>>>>>> * Last analys, there were both two types of jobs running >>>>>>> and leak >>>>>>>> suspect was again NetworkBufferPool. (image 3) >>>>>>>> In our system jobs are regularly cancelled and resubmitted so >>>>>>> we >>>>>>>> noticed that when job is submitted some amount of memory >>>>>>> allocated and >>>>>>>> after cancelation this allocated memory never freed. So over >>>>>>> time >>>>>>>> memory usage is always increasing and exceeded the limits. >>>>>>> Links: >>>>>>> ------ >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-7845 = >>>>>>> Hi Piotr, >>>>>>> There are two types of jobs. >>>>>>> In first, we use Kafka source and Kafka sink, there isn't any >>>>>>> window operator. >>>>>>>> In second job, we use Kafka source, filesystem sink and >>>>>>> elastic search sink and window operator for buffering. >>>>>>> Hi Piotrek, >>>>>>> Thanks for your reply. >>>>>>> We've tested our link cluster again. We have 360 slots, and our >>>>>>> cluster configuration is like this; >>>>>>> jobmanager.rpc.address: %JOBMANAGER% >>>>>>> jobmanager.rpc.port: 6123 >>>>>>> jobmanager.heap.mb: 1536 >>>>>>> taskmanager.heap.mb: 1536 >>>>>>> taskmanager.numberOfTaskSlots: 120 >>>>>>> taskmanager.memory.preallocate: false >>>>>>> parallelism.default: 1 >>>>>>> jobmanager.web.port: 8081 >>>>>>> state.backend: filesystem >>>>>>> state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR% >>>>>>> state.checkpoints.dir: file:///storage/%CHECKPOINTDIR% >>>>>>> taskmanager.network.numberOfBuffers: 5000 >>>>>>> We are using docker based Flink cluster. >>>>>>> WE submitted 36 jobs with the parallelism of 10. After all slots >>>>>>> became full. Memory usage were increasing by the time and one by = one >>>>>>> task managers start to die. And the exception was like this; >>>>>>> Taskmanager1 log: >>>>>>> Uncaught error from thread = [flink-akka.actor.default-dispatcher-17] >>>>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is = enabled for >>>>>>> ActorSystem[flink] >>>>>>> java.lang.NoClassDefFoundError: >>>>>>> org/apache/kafka/common/metrics/stats/Rate$1 >>>>>>> =E2=80=82=E2=80=82at >>>>>>> org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricW= rapper.getValue(KafkaMetricWrapper.java:35) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricW= rapper.getValue(KafkaMetricWrapper.java:26) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGau= ge(MetricDumpSerialization.java:213) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(M= etricDumpSerialization.java:50) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSe= rializer.serialize(MetricDumpSerialization.java:138) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQ= ueryService.java:109) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:= 167) >>>>>>> =E2=80=82=E2=80=82at = akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>>>> =E2=80=82=E2=80=82at = akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>>>> =E2=80=82=E2=80=82at = akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>>> =E2=80=82=E2=80=82at = akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>>> =E2=80=82=E2=80=82at = akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>>> =E2=80=82=E2=80=82at = akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractD= ispatcher.scala:397) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java= :1339) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.ja= va:107) >>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>> org.apache.kafka.common.metrics.stats.Rate$1 >>>>>>> =E2=80=82=E2=80=82at = java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>>> =E2=80=82=E2=80=82at = java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>> =E2=80=82=E2=80=82at = java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>> =E2=80=82=E2=80=82... 22 more >>>>>>> Taskmanager2 log: >>>>>>> Uncaught error from thread = [flink-akka.actor.default-dispatcher-17] >>>>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is = enabled for >>>>>>> ActorSystem[flink] >>>>>>> Java.lang.NoClassDefFoundError: >>>>>>> = org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1 >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$Offs= etGauge.getValue(AbstractFetcher.java:492) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$Offs= etGauge.getValue(AbstractFetcher.java:480) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGau= ge(MetricDumpSerialization.java:213) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(M= etricDumpSerialization.java:50) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSe= rializer.serialize(MetricDumpSerialization.java:138) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQ= ueryService.java:109) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:= 167) >>>>>>> =E2=80=82=E2=80=82at = akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>>>> =E2=80=82=E2=80=82at = akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>>>> =E2=80=82=E2=80=82at = akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>>> =E2=80=82=E2=80=82at = akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>>> =E2=80=82=E2=80=82at = akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>>> =E2=80=82=E2=80=82at = akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractD= ispatcher.scala:397) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java= :1339) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>> =E2=80=82=E2=80=82at >>>>>>> = scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.ja= va:107) >>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>> = org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1 >>>>>>> =E2=80=82=E2=80=82at = java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>>> =E2=80=82=E2=80=82at = java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>> =E2=80=82=E2=80=82at = java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>> =E2=80=82=E2=80=82... 18 more >>>>>>> -Ebru >>>>>> Hi Piotrek, >>>>>> We attached the full log of the taskmanager1. >>>>>> This may not be a dependency issue because until all of the task = slots is full, we didn't get any No Class Def Found exception, when = there is available memory jobs can run without exception for days. >>>>>> Also there is Kafka Instance Already Exist exception in full log, = but this not relevant and doesn't effect jobs or task managers. >>>>>> -Ebru >>>> Hi, >>>> Sorry we attached wrong log file. I've attached all task managers = and job manager's log. All task managers and job manager was = killed. >> >=20 >=20 >=20 >=20 > --=20 > Flavio Pompermaier > Development Department >=20 > OKKAM S.r.l. > Tel. +(39) 0461 041809 --Apple-Mail=_19B75350-BBB9-4565-85CC-8C27952F5BAF Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Best = would be to analyse memory usage via some profiler. What I have done = was:

1. Run your = scenario on the test cluster until memory consumption goes up
2. Stop submitting new jobs, cancel or running jobs
3. Manually triggered GC couple of times via jconsole (other = tools can do that as well)
4. Analyse memory = consumption via:
A) Oracle=E2=80=99s Mission = Control (Java Mission Control, jmc)
  - = analyse memory consumption and check which memory pool is growing = (OldGen heap? Metaspace? Code Cache? Non heap?)
  - run flight record with checked all memory = options
  - check which objects were using a = lot of memory 
B) VisualVM
  - take heap dump and analyse what is using up all of = this memory
C) jconsole
  = - this can tell you memory pool status of you JVMs, but will not tell = you what objects are actually exhausting the pools

Couple of = remarks:
- because of GC memory usage can goes up = and down. Important is the trend of local minimums measured just after = manually triggered GC
- you might have to repeat = steps 2, 3, 4 to actually see what has increased between submitting the = jobs
- by default network buffers are using 10% of = heap space in byte[], so you can ignore those
- = this JDK bug that I have reproduced was visible by huge memory = consumption of multiple char[] and ConcurrentHashMap$Node instances =   

Piotrek

On = 14 Nov 2017, at 16:08, Flavio Pompermaier <pompermaier@okkam.it> wrote:

What should we do to confirm it? Do you have any github repo = start from?

On Tue, Nov 14, 2017 at 4:02 PM, Piotr Nowojski = <piotr@data-artisans.com> wrote:
Ebru, Javier, Flavio:

I tried to reproduce memory leak by submitting a job, that = was generating classes with random names. And indeed I have found one. = Memory was accumulating in `char[]` instances that belonged to = `java.lang.ClassLoader#parallelLockMap`. OldGen memory = pool was growing in size up to the point I got:

java.lang.OutOfMemoryError: Java heap = space

This seems like an old known =E2=80=9Cfeature=E2=80=9D of = JDK:

Can any of you confirm that this is the = issue that you are experiencing? If not, I would really need more = help/information from you to track this down.

Piotrek

On 10 Nov 2017, at 15:12, =C3=87ET=C4=B0NKAYA = EBRU =C3=87ET=C4=B0NKAYA EBRU <b20926247@cs.hacettepe.edu.tr> = wrote:

On 2017-11-10 13:14, Piotr = Nowojski wrote:
jobmanager1.log and taskmanager2.log are the same. Can you = also submit
files containing std output?
Piotrek
On = 10 Nov 2017, at 09:35, =C3=87ET=C4=B0NKAYA EBRU =C3=87ET=C4=B0NKAYA EBRU = <b20926247@cs.hacettepe.edu.tr> = wrote:
On 2017-11-10 11:04, Piotr Nowojski wrote:
Hi,
Thanks = for the logs, however I do not see before mentioned exceptions
in it. It ends with java.lang.InterruptedException
Is it the correct log file? Also, could you attach the std = output file
of the failing TaskManager?
Piotrek
On = 10 Nov 2017, at 08:42, =C3=87ET=C4=B0NKAYA EBRU =C3=87ET=C4=B0NKAYA EBRU = <b20926247@cs.hacettepe.edu.tr> = wrote:
On 2017-11-09 20:08, Piotr Nowojski wrote:
Hi,
Could = you attach full logs from those task managers? At first glance I
don=E2=80=99t see a connection between those exceptions and = any memory issue
that you might had. It looks like a = dependency issue in one (some?
All?) of your jobs.
Did you build your jars with -Pbuild-jar profile as described = here:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
?
If that doesn=E2=80=99t help. Can you binary search which job = is causing the
problem? There might be some Flink = incompatibility between different
versions and rebuilding = a job=E2=80=99s jar with a version matching to the
cluster = version might help.
Piotrek
On 9 Nov 2017, at 17:36, =C3=87ET=C4=B0NKAYA = EBRU =C3=87ET=C4=B0NKAYA EBRU
<b20926247@cs.hacettepe.edu.tr> = wrote:
On 2017-11-08 18:30, Piotr Nowojski wrote:
Btw, Ebru:
I don=E2=80=99t agree that the main = suspect is NetworkBufferPool. On your
screenshots it=E2=80=99= s memory consumption was reasonable and stable:
596MB
-> 602MB -> 597MB.
PoolThreadCache memory = usage ~120MB is also reasonable.
Do you experience any = problems, like Out Of Memory
errors/crashes/long
GC pauses? Or just JVM process is using more memory over = time? You
are
aware that JVM doesn=E2=80=99t = like to release memory back to OS once it
was
used? So increasing memory usage until hitting some limit = (for
example
JVM max heap size) is expected = behaviour.
Piotrek
On 8 Nov 2017, at 15:48, = Piotr Nowojski <piotr@data-artisans.com>
wrote:
I don=E2=80=99t know if this is relevant = to this issue, but I was
constantly getting failures = trying to reproduce this leak using your
Job, because you = were using non deterministic getKey function:
@Override
public Integer getKey(Integer event) {
Random = randomGen =3D new Random((new Date()).getTime());
return = randomGen.nextInt() % 8;
}
And quoting Java = doc of KeySelector:
"If invoked multiple times on the same = object, the returned key must
be the same.=E2=80=9D
I=E2=80=99m trying to reproduce this issue with following = job:
https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3
Where IntegerSource is just an infinite source, DisardingSink = is
well just discarding incoming data. I=E2=80=99m = cancelling the job every 5
seconds and so far (after ~15 = minutes) my memory consumption is
stable, well below = maximum java heap size.
Piotrek
On 8 Nov = 2017, at 15:28, Javier Lopez <javier.lopez@zalando.de>
wrote:
Yes, I tested with just printing the stream. But it could = take a
lot of time to fail.
On Wednesday, 8 = November 2017, Piotr Nowojski
<piotr@data-artisans.com> wrote:
Thanks = for quick answer.
So it will also fail after some time = with `fromElements` source
instead of Kafka, right?
Did you try it also without a Kafka producer?
Piotrek
On 8 Nov 2017, at 14:57, Javier Lopez = <javier.lopez@zalando.de>
wrote:
Hi,
You don't need data. With data it will die = faster. I tested as
well with a small data set, using the = fromElements source, but it
will take some time to die. = It's better with some data.
On 8 November 2017 at 14:54, = Piotr Nowojski
<piotr@data-artisans.com> wrote:
Hi,
Thanks for sharing this job.
Do I need to feed = some data to the Kafka to reproduce this
issue = with your script?
Does this OOM issue also = happen when you are not using the
Kafka source/sink?
Piotrek
On 8 Nov 2017, at 14:08, Javier Lopez = <javier.lopez@zalando.de>
wrote:
Hi,
This is the test flink job we created to trigger this leak
https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
And this is the python script we are using to execute the = job
thousands of times to get = the OOM problem
https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107
The cluster we used for this has this configuration:
Instance type: t2.large
Number of workers: 2
HeapMemory: 5500
Number of task slots per node: = 4
TaskMangMemFraction: 0.5
NumberOfNetworkBuffers: 2000
We have tried = several things, increasing the heap, reducing the
heap, more memory fraction, changes = this value in the
taskmanager.sh = "TM_MAX_OFFHEAP_SIZE=3D"2G"; and nothing seems to
work.
Thanks for your help.
On 8 November 2017 at = 13:26, =C3=87ET=C4=B0NKAYA EBRU =C3=87ET=C4=B0NKAYA EBRU
<b20926247@cs.hacettepe.edu.tr> = wrote:
On 2017-11-08 = 15:20, Piotr Nowojski wrote:
Hi Ebru and Javier,
Yes, if you could share this example job it would be = helpful.
Ebru: could you explain in a little more details = how does
your Job(s)
look like? Could you post some code? If you are = just using
maps and
filters there shouldn=E2=80=99t be any network = transfers involved,
aside
from Source and Sink = functions.
Piotrek
On 8 Nov 2017, at 12:54, = ebru
<b20926247@cs.hacettepe.edu.tr> = wrote:
Hi Javier,
It would be helpful if you share your test job with us.
Which configurations did you try?
-Ebru
On 8 Nov 2017, at 14:43, Javier Lopez
<javier.lopez@zalando.de>
wrote:
Hi,
We have been facing a similar problem. We = have tried some
different
configurations, as = proposed in other email thread by Flavio
and
Kien, but it didn't = work. We have a workaround similar to
the = one
that Flavio has, = we restart the taskmanagers once they reach
a
memory threshold. We created a small test to remove all of
our
dependencies and leave only flink native libraries. This
test reads
data from a Kafka topic and writes it back to another = topic
in
Kafka. We cancel the job and start another = every 5 seconds.
After
~30 minutes of doing = this process, the cluster reaches the
OS = memory
limit and = dies.
Currently, we have a test cluster with 8 workers and = 8 task
slots
per node. We have one job that uses 56 slots, = and we cannot
execute
that job 5 times in a row because the whole = cluster dies. If
you
want, we can publish our test job.
Regards,
On 8 November 2017 at 11:20, Aljoscha = Krettek
<aljoscha@apache.org>
wrote:
@Nico & @Piotr Could = you please have a look at this? You
both
recently worked on the = network stack and might be most
familiar = with
this.
On 8. Nov 2017, at 10:25, Flavio Pompermaier
<pompermaier@okkam.it>
wrote:
We = also have the same problem in production. At the moment
the
solution is to restart the entire Flink cluster after = every
job..
We've tried to reproduce this problem with a = test (see
https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we
don't
know whether the error produced by the test and the leak = are
correlated..
Best,
FlavioOn Wed, Nov 8, 2017 at 9:51 AM, =C3=87ET=C4=B0NKAYA EBRU = =C3=87ET=C4=B0NKAYA
EBRU
<b20926247@cs.hacettepe.edu.tr> = wrote:
On 2017-11-07 16:53, Ufuk Celebi wrote:
Do you use any windowing? If yes, could you please share
that code?
If
there is no stateful operation at all, it's = strange where
the list
state instances are = coming from.
On Tue, Nov 7, 2017 at 2:35 PM, ebru
<b20926247@cs.hacettepe.edu.tr>
wrote:
Hi = Ufuk,
We don=E2=80=99t explicitly define any state = descriptor. We only
use map
and filters
operator. We thought that gc handle clearing the flink=E2=80=99= s
internal
states.
So how can we manage the = memory if it is always increasing?
- Ebru
On = 7 Nov 2017, at 16:23, Ufuk Celebi <uce@apache.org> wrote:
Hey Ebru, the memory usage might be increasing as long as = a
job is
running.
This is expected (also = in the case of multiple running
jobs). The
screenshots are not = helpful in that regard. :-(
What kind of stateful = operations are you using? Depending on
your
use case,
you have to manually call `clear()` on the state instance = in
order
to
release the managed state.
Best,
Ufuk
On Tue, Nov 7, 2017 at = 12:43 PM, ebru
<b20926247@cs.hacettepe.edu.tr> = wrote:
Begin forwarded message:
From: ebru = <b20926247@cs.hacettepe.edu.tr>
Subject: Re: Flink memory leak
Date: 7 November = 2017 at 14:09:17 GMT+3
To: Ufuk Celebi <uce@apache.org>
Hi Ufuk,
There are there snapshots of htop output.
1. = snapshot is initial state.
2. snapshot is after submitted = one job.
3. Snapshot is the output of the one job with = 15000 EPS. And
the
memory
usage is always increasing = over time.
<1.png><2.png><3.png>
On 7 Nov 2017, at 13:34, Ufuk Celebi <uce@apache.org> wrote:
Hey Ebru,
let me pull in Aljoscha (CC'd) who might have an idea = what's
causing
this.
Since multiple jobs are = running, it will be hard to
understand to
which job the state = descriptors from the heap snapshot
belong = to.
- Is it possible = to isolate the problem and reproduce the
behaviour
with only a single job?
=E2=80=93 Ufuk
On Tue, Nov 7, 2017 at 10:27 AM, =C3=87ET=C4=B0NKAYA EBRU
=C3=87ET=C4=B0NKAYA EBRU
<b20926247@cs.hacettepe.edu.tr> = wrote:
Hi,
We are using Flink 1.3.1 in = production, we have one job
manager and
3 task
managers in standalone mode. Recently, we've noticed that = we
have
memory
related problems. We use docker = container to serve Flink
cluster. We
have
300 = slots and 20 jobs are running with parallelism of 10.
Also the
job
count
may be change over = time. Taskmanager memory usage always
increases.
After
job cancelation this memory usage = doesn't decrease. We've
tried to
investigate the problem = and we've got the task manager jvm
heap
snapshot.
According to the jam heap analysis, possible memory leak = was
Flink
list
state descriptor. But we are = not sure that is the cause of
our
memory
problem. How can we solve the problem?
We have = two types of Flink job. One has no state full
operator
contains only maps and filters and the other has time = window
with
count trigger.
* We've analysed = the jvm heaps again in different
conditions. = First
we analysed the = snapshot when no flink jobs running on
cluster.= (image
1)
* Then, we analysed the jvm heap snapshot when the flink = job
that has
no state full operator is running. And = according to the
results, leak
suspect was = NetworkBufferPool (image 2)
*   Last analys, = there were both two types of jobs running
and = leak
suspect was = again NetworkBufferPool. (image 3)
In our system jobs are = regularly cancelled and resubmitted so
we
noticed that when job is = submitted some amount of memory
allocated = and
after cancelation = this allocated memory never freed. So over
time
memory usage is always increasing and exceeded the limits.
Links:
------
[1] https://issues.apache.org/jira/browse/FLINK-7845
Hi Piotr,
There are two types of jobs.
In first, we use = Kafka source and Kafka sink, there isn't any
window = operator.
In second = job, we use Kafka source, filesystem sink and
elastic search sink and window operator for = buffering.
Hi Piotrek,
Thanks for your = reply.
We've tested our link cluster again. We have 360 = slots, and our
cluster configuration is like this;
jobmanager.rpc.address: %JOBMANAGER%
jobmanager.rpc.port: 6123
jobmanager.heap.mb: = 1536
taskmanager.heap.mb: 1536
taskmanager.numberOfTaskSlots: 120
taskmanager.memory.preallocate: false
parallelism.default: 1
jobmanager.web.port: = 8081
state.backend: filesystem
state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR%
state.checkpoints.dir: file:///storage/%CHECKPOINTDIR%
taskmanager.network.numberOfBuffers: 5000
We are using docker based Flink cluster.
WE = submitted 36 jobs with the parallelism of 10. After all slots
became full. Memory usage were increasing by the time and one = by one
task managers start to die. And the exception was = like this;
Taskmanager1 log:
Uncaught error = from thread [flink-akka.actor.default-dispatcher-17]
shutting down JVM since 'akka.jvm-exit-on-fatal-error' is = enabled for
ActorSystem[flink]
java.lang.NoClassDefFoundError:
org/apache/kafka/common/metrics/stats/Rate$1=E2=80=82=E2=80=82at
org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93)
=E2=80=82=E2=80=82at
org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62)
=E2=80=82=E2=80=82at
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
=E2=80=82=E2=80=82at
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
=E2=80=82=E2=80=82at
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35)
=E2=80=82=E2=80=82at<= br class=3D"">org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26)
=E2=80=82=E2=80=82at<= br class=3D"">org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213)
=E2=80=82=E2=80=82at
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50)
=E2=80=82=E2=80=82at
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138)
=E2=80=82=E2=80=82at
org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
=E2=80=82=E2=80=82at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
=E2=80=82=E2=80=82at = akka.actor.Actor$class.aroundReceive(Actor.scala:467)
=E2=80=82=E2=80=82at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
=E2=80=82=E2=80=82= at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
=E2=80=82=E2=80=82at = akka.actor.ActorCell.invoke(ActorCell.scala:487)
=E2=80=82=E2=80=82at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
=E2=80=82=E2=80=82at = akka.dispatch.Mailbox.run(Mailbox.scala:220)
=E2=80=82=E2=80=82at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
=E2=80=82=E2=80=82at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
=E2=80=82=E2=80=82at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
=E2=80=82=E2=80=82at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
=E2=80=82=E2=80=82at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: = java.lang.ClassNotFoundException:
org.apache.kafka.common.metrics.stats.Rate$1=E2=80=82=E2=80=82at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
=E2=80=82=E2=80=82at= java.lang.ClassLoader.loadClass(ClassLoader.java:424)
=E2=80=82=E2=80=82at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
=E2=80=82=E2=80=82... = 22 more
Taskmanager2 log:
Uncaught error = from thread [flink-akka.actor.default-dispatcher-17]
shutting down JVM since 'akka.jvm-exit-on-fatal-error' is = enabled for
ActorSystem[flink]
Java.lang.NoClassDefFoundError:
org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1
=E2=80=82=E2=80=82at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:492)
=E2=80=82=E2=80=82atorg.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:480)
=E2=80=82=E2=80=82atorg.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213)
=E2=80=82=E2=80=82at
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50)
=E2=80=82=E2=80=82at
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138)
=E2=80=82=E2=80=82at
org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
=E2=80=82=E2=80=82at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
=E2=80=82=E2=80=82at = akka.actor.Actor$class.aroundReceive(Actor.scala:467)
=E2=80=82=E2=80=82at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
=E2=80=82=E2=80=82= at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
=E2=80=82=E2=80=82at = akka.actor.ActorCell.invoke(ActorCell.scala:487)
=E2=80=82=E2=80=82at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
=E2=80=82=E2=80=82at = akka.dispatch.Mailbox.run(Mailbox.scala:220)
=E2=80=82=E2=80=82at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
=E2=80=82=E2=80=82at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
=E2=80=82=E2=80=82at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
=E2=80=82=E2=80=82at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
=E2=80=82=E2=80=82at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: = java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1
=E2=80=82=E2=80=82at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
=E2=80=82=E2=80=82at= java.lang.ClassLoader.loadClass(ClassLoader.java:424)
=E2=80=82=E2=80=82at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
=E2=80=82=E2=80=82... = 18 more
-Ebru
Hi Piotrek,
We attached the full log of the taskmanager1.
This may not be a dependency issue because until all of the = task slots is full, we didn't get any No Class Def Found exception, when = there is available memory jobs can run without exception for days.
Also there is Kafka Instance Already Exist exception in full = log, but this not relevant and doesn't effect jobs or task managers.
-Ebru<taskmanager1.log.zip>
Hi,
Sorry we attached = wrong log file. I've attached all task managers and job manager's log. = All task managers and job manager was killed.<logs.zip>
<logs2-2.zip>




--
Flavio Pompermaier
Development = Department

OKKAM S.r.l.
Tel. = +(39) 0461 = 041809

= --Apple-Mail=_19B75350-BBB9-4565-85CC-8C27952F5BAF--