Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4F79A200D3C for ; Tue, 14 Nov 2017 16:02:37 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 48132160BF4; Tue, 14 Nov 2017 15:02:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4BF091609EF for ; Tue, 14 Nov 2017 16:02:35 +0100 (CET) Received: (qmail 19053 invoked by uid 500); 14 Nov 2017 15:02:34 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 19043 invoked by uid 99); 14 Nov 2017 15:02:34 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Nov 2017 15:02:34 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 35C08C87BF for ; Tue, 14 Nov 2017 15:02:33 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.98 X-Spam-Level: * X-Spam-Status: No, score=1.98 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id tFA74LWfavL9 for ; Tue, 14 Nov 2017 15:02:29 +0000 (UTC) Received: from mail-wm0-f48.google.com (mail-wm0-f48.google.com [74.125.82.48]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 3195860CF8 for ; Tue, 14 Nov 2017 15:02:29 +0000 (UTC) Received: by mail-wm0-f48.google.com with SMTP id b189so15200738wmd.5 for ; Tue, 14 Nov 2017 07:02:29 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=from:message-id:mime-version:subject:date:in-reply-to:cc:to :references; bh=HDuuCOVLCs1yFGDTrPvXzRtZuEH43ycpcc+DmfaNwkc=; b=McWwsWBn1x55JeZoMAvSDwX4DfiYgkX4Oo7yRD2AG995dGOgRw5tHFNrRCAIvPHe3G kO/okHMLcjaKofyn7RVkjIoy+qJR0cakyvy9pnhrUvfZN1I1KapIFY/StQ1PA5WNuLX1 HzzNZQm5QrsxDE+XiU5sM/akK19swFMNc/fGbljGysR3ebAr2BhrNsoRU25sPzKCQa3Y OE52XSyIVyiEQH/jk/RO6s4CHxJlMafBUjsWexsBeJbo74qJc9P2mXHXs7HtFeXfa6lT ocMCv4AtetSvzfBRG9JOM/UXAh5kPF8vklBIasDrxafiTGo5Phiqn03OkK6SVTckn0u4 Csmw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:message-id:mime-version:subject:date :in-reply-to:cc:to:references; bh=HDuuCOVLCs1yFGDTrPvXzRtZuEH43ycpcc+DmfaNwkc=; b=MUKtFefj8mEQSYCvyPg5LeqjJcM/r2AzkzNRxoj+4Y+CBTfTCLSbo694DXwEOtEmk3 sdazh8W33q13uvhWqB4lIbw12zFTJmG5mHJ8zv60nmpGash202j5XfaVJSjUk/16zC1V VbOOUgxFnKK+n1c8/lcjIFNEQTu7pztIR9ZfSl9YChhQNkP2kJ6KPnQK0sE8EwfyigHM OY0XoOjEU3akP8Q3d9m4lv8Xv0AwdAEannuNaDzcrzxs6Izwh18ywagH39QsYjlpYrze NANovx5zNIOW5obcENDo5LNGKHm2ckoL7EEV32GLl8bWWz4QTYik4/YNRGzMiY1WNTFm u56A== X-Gm-Message-State: AJaThX5Tdjt8gnEDjLcyqmV0UZRx+otHl/QBM04ykOI1BKnMr/XPt9F/ xfIFSLGOnjLGS/cWQexamvTY9A== X-Google-Smtp-Source: AGs4zMb6ofNuE0tRDuFc4AyHD0iAqROLN+7Ep5Sg0BjYe2slttMmFjsr9bZSjG7OInwq0S2AyhNqKQ== X-Received: by 10.80.139.203 with SMTP id n11mr17771501edn.200.1510671748554; Tue, 14 Nov 2017 07:02:28 -0800 (PST) Received: from piotrs-mbp.fritz.box (ip-2-205-80-95.web.vodafone.de. [2.205.80.95]) by smtp.gmail.com with ESMTPSA id d30sm16355567ede.10.2017.11.14.07.02.25 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Tue, 14 Nov 2017 07:02:27 -0800 (PST) From: Piotr Nowojski Message-Id: <9E44A461-061E-4474-AE1E-9E9D70DA23BD@data-artisans.com> Content-Type: multipart/alternative; boundary="Apple-Mail=_8241DE43-4FF9-4491-BB43-BD8AE94DB6DD" Mime-Version: 1.0 (Mac OS X Mail 11.1 \(3445.4.7\)) Subject: Re: Flink memory leak Date: Tue, 14 Nov 2017 16:02:24 +0100 In-Reply-To: <3edbdfc7da76ff2c2deecc4280f2168a@cs.hacettepe.edu.tr> Cc: Aljoscha Krettek , Nico Kruber , Ufuk Celebi , user To: =?utf-8?B?w4dFVMSwTktBWUEgRUJSVSDDh0VUxLBOS0FZQSBFQlJV?= , Javier Lopez , pompermaier@okkam.it References: <58D3AB1E-821D-46A2-8F04-8BB708037C2B@cs.hacettepe.edu.tr> <30C290C1-3FFD-479D-A926-861B110638BD@cs.hacettepe.edu.tr> <8c5facf8eda9ed6197c9e1c36099f8bb@cs.hacettepe.edu.tr> <14DD60D2-6E14-4DF3-836C-0A712F68B8C2@data-artisans.com> <95582EE2-9F90-43B8-8B3A-71A027A5DB1B@data-artisans.com> <972D0323-A628-45C6-AD1B-654EBD95AD96@data-artisans.com> <0019c4f2caf07e0fbde7a73ab7361002@cs.hacettepe.edu.tr> <9021B6B4-8A8D-4DBF-9C24-AC1F06956A4C@data-artisans.com> <57a785fab1f98ee12060d42adad186ff@cs.hacettepe.edu.tr> <633C1FF0-9221-4689-9401-B3708DA305F4@data-artisans.com> <3edbdfc7da76ff2c2deecc4280f2168a@cs.hacettepe.edu.tr> X-Mailer: Apple Mail (2.3445.4.7) archived-at: Tue, 14 Nov 2017 15:02:37 -0000 --Apple-Mail=_8241DE43-4FF9-4491-BB43-BD8AE94DB6DD Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Ebru, Javier, Flavio: I tried to reproduce memory leak by submitting a job, that was = generating classes with random names. And indeed I have found one. = Memory was accumulating in `char[]` instances that belonged to = `java.lang.ClassLoader#parallelLockMap`. OldGen memory pool was growing = in size up to the point I got: java.lang.OutOfMemoryError: Java heap space This seems like an old known =E2=80=9Cfeature=E2=80=9D of JDK: https://bugs.openjdk.java.net/browse/JDK-8037342 = Can any of you confirm that this is the issue that you are experiencing? = If not, I would really need more help/information from you to track this = down. Piotrek > On 10 Nov 2017, at 15:12, =C3=87ET=C4=B0NKAYA EBRU =C3=87ET=C4=B0NKAYA = EBRU wrote: >=20 > On 2017-11-10 13:14, Piotr Nowojski wrote: >> jobmanager1.log and taskmanager2.log are the same. Can you also = submit >> files containing std output? >> Piotrek >>> On 10 Nov 2017, at 09:35, =C3=87ET=C4=B0NKAYA EBRU =C3=87ET=C4=B0NKAYA= EBRU wrote: >>> On 2017-11-10 11:04, Piotr Nowojski wrote: >>>> Hi, >>>> Thanks for the logs, however I do not see before mentioned = exceptions >>>> in it. It ends with java.lang.InterruptedException >>>> Is it the correct log file? Also, could you attach the std output = file >>>> of the failing TaskManager? >>>> Piotrek >>>>> On 10 Nov 2017, at 08:42, =C3=87ET=C4=B0NKAYA EBRU =C3=87ET=C4=B0NKA= YA EBRU wrote: >>>>> On 2017-11-09 20:08, Piotr Nowojski wrote: >>>>>> Hi, >>>>>> Could you attach full logs from those task managers? At first = glance I >>>>>> don=E2=80=99t see a connection between those exceptions and any = memory issue >>>>>> that you might had. It looks like a dependency issue in one = (some? >>>>>> All?) of your jobs. >>>>>> Did you build your jars with -Pbuild-jar profile as described = here: >>>>>> = https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/jav= a_api_quickstart.html#build-project >>>>>> ? >>>>>> If that doesn=E2=80=99t help. Can you binary search which job is = causing the >>>>>> problem? There might be some Flink incompatibility between = different >>>>>> versions and rebuilding a job=E2=80=99s jar with a version = matching to the >>>>>> cluster version might help. >>>>>> Piotrek >>>>>>> On 9 Nov 2017, at 17:36, =C3=87ET=C4=B0NKAYA EBRU =C3=87ET=C4=B0NK= AYA EBRU >>>>>>> wrote: >>>>>>> On 2017-11-08 18:30, Piotr Nowojski wrote: >>>>>>> Btw, Ebru: >>>>>>> I don=E2=80=99t agree that the main suspect is = NetworkBufferPool. On your >>>>>>> screenshots it=E2=80=99s memory consumption was reasonable and = stable: >>>>>>> 596MB >>>>>>> -> 602MB -> 597MB. >>>>>>> PoolThreadCache memory usage ~120MB is also reasonable. >>>>>>> Do you experience any problems, like Out Of Memory >>>>>>> errors/crashes/long >>>>>>> GC pauses? Or just JVM process is using more memory over time? = You >>>>>>> are >>>>>>> aware that JVM doesn=E2=80=99t like to release memory back to OS = once it >>>>>>> was >>>>>>> used? So increasing memory usage until hitting some limit (for >>>>>>> example >>>>>>> JVM max heap size) is expected behaviour. >>>>>>> Piotrek >>>>>>> On 8 Nov 2017, at 15:48, Piotr Nowojski = >>>>>>> wrote: >>>>>>> I don=E2=80=99t know if this is relevant to this issue, but I = was >>>>>>> constantly getting failures trying to reproduce this leak using = your >>>>>>> Job, because you were using non deterministic getKey function: >>>>>>> @Override >>>>>>> public Integer getKey(Integer event) { >>>>>>> Random randomGen =3D new Random((new Date()).getTime()); >>>>>>> return randomGen.nextInt() % 8; >>>>>>> } >>>>>>> And quoting Java doc of KeySelector: >>>>>>> "If invoked multiple times on the same object, the returned key = must >>>>>>> be the same.=E2=80=9D >>>>>>> I=E2=80=99m trying to reproduce this issue with following job: >>>>>>> = https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 >>>>>>> Where IntegerSource is just an infinite source, DisardingSink is >>>>>>> well just discarding incoming data. I=E2=80=99m cancelling the = job every 5 >>>>>>> seconds and so far (after ~15 minutes) my memory consumption is >>>>>>> stable, well below maximum java heap size. >>>>>>> Piotrek >>>>>>> On 8 Nov 2017, at 15:28, Javier Lopez >>>>>>> wrote: >>>>>>> Yes, I tested with just printing the stream. But it could take a >>>>>>> lot of time to fail. >>>>>>> On Wednesday, 8 November 2017, Piotr Nowojski >>>>>>> wrote: >>>>>>> Thanks for quick answer. >>>>>>> So it will also fail after some time with `fromElements` source >>>>>>> instead of Kafka, right? >>>>>>> Did you try it also without a Kafka producer? >>>>>>> Piotrek >>>>>>> On 8 Nov 2017, at 14:57, Javier Lopez >>>>>>> wrote: >>>>>>> Hi, >>>>>>> You don't need data. With data it will die faster. I tested as >>>>>>> well with a small data set, using the fromElements source, but = it >>>>>>> will take some time to die. It's better with some data. >>>>>>> On 8 November 2017 at 14:54, Piotr Nowojski >>>>>>> wrote: >>>>>>> Hi, >>>>>>> Thanks for sharing this job. >>>>>>> Do I need to feed some data to the Kafka to reproduce this >>>>>> issue with your script? >>>>>>>> Does this OOM issue also happen when you are not using the >>>>>> Kafka source/sink? >>>>>>>> Piotrek >>>>>>>> On 8 Nov 2017, at 14:08, Javier Lopez >>>>>> wrote: >>>>>>>> Hi, >>>>>>>> This is the test flink job we created to trigger this leak >>>>>> = https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 >>>>>>>> And this is the python script we are using to execute the job >>>>>> thousands of times to get the OOM problem >>>>>> = https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 >>>>>>>> The cluster we used for this has this configuration: >>>>>>>> Instance type: t2.large >>>>>>>> Number of workers: 2 >>>>>>>> HeapMemory: 5500 >>>>>>>> Number of task slots per node: 4 >>>>>>>> TaskMangMemFraction: 0.5 >>>>>>>> NumberOfNetworkBuffers: 2000 >>>>>>>> We have tried several things, increasing the heap, reducing the >>>>>> heap, more memory fraction, changes this value in the >>>>>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE=3D"2G"; and nothing seems to >>>>>> work. >>>>>>>> Thanks for your help. >>>>>>>> On 8 November 2017 at 13:26, =C3=87ET=C4=B0NKAYA EBRU = =C3=87ET=C4=B0NKAYA EBRU >>>>>> wrote: >>>>>>> On 2017-11-08 15:20, Piotr Nowojski wrote: >>>>>>> Hi Ebru and Javier, >>>>>>> Yes, if you could share this example job it would be helpful. >>>>>>> Ebru: could you explain in a little more details how does >>>>>> your Job(s) >>>>>>> look like? Could you post some code? If you are just using >>>>>> maps and >>>>>>> filters there shouldn=E2=80=99t be any network transfers = involved, >>>>>> aside >>>>>>> from Source and Sink functions. >>>>>>> Piotrek >>>>>>> On 8 Nov 2017, at 12:54, ebru >>>>>> wrote: >>>>>>> Hi Javier, >>>>>>> It would be helpful if you share your test job with us. >>>>>>> Which configurations did you try? >>>>>>> -Ebru >>>>>>> On 8 Nov 2017, at 14:43, Javier Lopez >>>>>> >>>>>>> wrote: >>>>>>> Hi, >>>>>>> We have been facing a similar problem. We have tried some >>>>>> different >>>>>>> configurations, as proposed in other email thread by Flavio >>>>>> and >>>>>>> Kien, but it didn't work. We have a workaround similar to >>>>>> the one >>>>>>> that Flavio has, we restart the taskmanagers once they reach >>>>>> a >>>>>>> memory threshold. We created a small test to remove all of >>>>>> our >>>>>>> dependencies and leave only flink native libraries. This >>>>>> test reads >>>>>>> data from a Kafka topic and writes it back to another topic >>>>>> in >>>>>>> Kafka. We cancel the job and start another every 5 seconds. >>>>>> After >>>>>>> ~30 minutes of doing this process, the cluster reaches the >>>>>> OS memory >>>>>>> limit and dies. >>>>>>> Currently, we have a test cluster with 8 workers and 8 task >>>>>> slots >>>>>>> per node. We have one job that uses 56 slots, and we cannot >>>>>> execute >>>>>>> that job 5 times in a row because the whole cluster dies. If >>>>>> you >>>>>>> want, we can publish our test job. >>>>>>> Regards, >>>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek >>>>>> >>>>>>> wrote: >>>>>>> @Nico & @Piotr Could you please have a look at this? You >>>>>> both >>>>>>> recently worked on the network stack and might be most >>>>>> familiar with >>>>>>> this. >>>>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier >>>>>> >>>>>>> wrote: >>>>>>> We also have the same problem in production. At the moment >>>>>> the >>>>>>> solution is to restart the entire Flink cluster after every >>>>>> job.. >>>>>>> We've tried to reproduce this problem with a test (see >>>>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we >>>>>> don't >>>>>>> know whether the error produced by the test and the leak are >>>>>>> correlated.. >>>>>>> Best, >>>>>>> Flavio >>>>>>> On Wed, Nov 8, 2017 at 9:51 AM, =C3=87ET=C4=B0NKAYA EBRU = =C3=87ET=C4=B0NKAYA >>>>>> EBRU >>>>>>> wrote: >>>>>>> On 2017-11-07 16:53, Ufuk Celebi wrote: >>>>>>> Do you use any windowing? If yes, could you please share >>>>>> that code? >>>>>>> If >>>>>>> there is no stateful operation at all, it's strange where >>>>>> the list >>>>>>> state instances are coming from. >>>>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru >>>>>> >>>>>>> wrote: >>>>>>> Hi Ufuk, >>>>>>> We don=E2=80=99t explicitly define any state descriptor. We only >>>>>> use map >>>>>>> and filters >>>>>>> operator. We thought that gc handle clearing the flink=E2=80=99s >>>>>> internal >>>>>>> states. >>>>>>> So how can we manage the memory if it is always increasing? >>>>>>> - Ebru >>>>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi wrote: >>>>>>> Hey Ebru, the memory usage might be increasing as long as a >>>>>> job is >>>>>>> running. >>>>>>> This is expected (also in the case of multiple running >>>>>> jobs). The >>>>>>> screenshots are not helpful in that regard. :-( >>>>>>> What kind of stateful operations are you using? Depending on >>>>>> your >>>>>>> use case, >>>>>>> you have to manually call `clear()` on the state instance in >>>>>> order >>>>>>> to >>>>>>> release the managed state. >>>>>>> Best, >>>>>>> Ufuk >>>>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >>>>>>> wrote: >>>>>>> Begin forwarded message: >>>>>>> From: ebru >>>>>>> Subject: Re: Flink memory leak >>>>>>> Date: 7 November 2017 at 14:09:17 GMT+3 >>>>>>> To: Ufuk Celebi >>>>>>> Hi Ufuk, >>>>>>> There are there snapshots of htop output. >>>>>>> 1. snapshot is initial state. >>>>>>> 2. snapshot is after submitted one job. >>>>>>> 3. Snapshot is the output of the one job with 15000 EPS. And >>>>>> the >>>>>>> memory >>>>>>> usage is always increasing over time. >>>>>>> <1.png><2.png><3.png> >>>>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi wrote: >>>>>>> Hey Ebru, >>>>>>> let me pull in Aljoscha (CC'd) who might have an idea what's >>>>>> causing >>>>>>> this. >>>>>>> Since multiple jobs are running, it will be hard to >>>>>> understand to >>>>>>> which job the state descriptors from the heap snapshot >>>>>> belong to. >>>>>>> - Is it possible to isolate the problem and reproduce the >>>>>> behaviour >>>>>>> with only a single job? >>>>>>> =E2=80=93 Ufuk >>>>>>> On Tue, Nov 7, 2017 at 10:27 AM, =C3=87ET=C4=B0NKAYA EBRU >>>>>> =C3=87ET=C4=B0NKAYA EBRU >>>>>>> wrote: >>>>>>> Hi, >>>>>>> We are using Flink 1.3.1 in production, we have one job >>>>>> manager and >>>>>>> 3 task >>>>>>> managers in standalone mode. Recently, we've noticed that we >>>>>> have >>>>>>> memory >>>>>>> related problems. We use docker container to serve Flink >>>>>> cluster. We >>>>>>> have >>>>>>> 300 slots and 20 jobs are running with parallelism of 10. >>>>>> Also the >>>>>>> job >>>>>>> count >>>>>>> may be change over time. Taskmanager memory usage always >>>>>> increases. >>>>>>> After >>>>>>> job cancelation this memory usage doesn't decrease. We've >>>>>> tried to >>>>>>> investigate the problem and we've got the task manager jvm >>>>>> heap >>>>>>> snapshot. >>>>>>> According to the jam heap analysis, possible memory leak was >>>>>> Flink >>>>>>> list >>>>>>> state descriptor. But we are not sure that is the cause of >>>>>> our >>>>>>> memory >>>>>>> problem. How can we solve the problem? >>>>>>> We have two types of Flink job. One has no state full >>>>>> operator >>>>>>> contains only maps and filters and the other has time window >>>>>> with >>>>>>> count trigger. >>>>>>> * We've analysed the jvm heaps again in different >>>>>> conditions. First >>>>>>> we analysed the snapshot when no flink jobs running on >>>>>> cluster. (image >>>>>>> 1) >>>>>>> * Then, we analysed the jvm heap snapshot when the flink job >>>>>> that has >>>>>>> no state full operator is running. And according to the >>>>>> results, leak >>>>>>> suspect was NetworkBufferPool (image 2) >>>>>>> * Last analys, there were both two types of jobs running >>>>>> and leak >>>>>>> suspect was again NetworkBufferPool. (image 3) >>>>>>> In our system jobs are regularly cancelled and resubmitted so >>>>>> we >>>>>>> noticed that when job is submitted some amount of memory >>>>>> allocated and >>>>>>> after cancelation this allocated memory never freed. So over >>>>>> time >>>>>>> memory usage is always increasing and exceeded the limits. >>>>>> Links: >>>>>> ------ >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-7845 >>>>>> Hi Piotr, >>>>>> There are two types of jobs. >>>>>> In first, we use Kafka source and Kafka sink, there isn't any >>>>>> window operator. >>>>>>> In second job, we use Kafka source, filesystem sink and >>>>>> elastic search sink and window operator for buffering. >>>>>> Hi Piotrek, >>>>>> Thanks for your reply. >>>>>> We've tested our link cluster again. We have 360 slots, and our >>>>>> cluster configuration is like this; >>>>>> jobmanager.rpc.address: %JOBMANAGER% >>>>>> jobmanager.rpc.port: 6123 >>>>>> jobmanager.heap.mb: 1536 >>>>>> taskmanager.heap.mb: 1536 >>>>>> taskmanager.numberOfTaskSlots: 120 >>>>>> taskmanager.memory.preallocate: false >>>>>> parallelism.default: 1 >>>>>> jobmanager.web.port: 8081 >>>>>> state.backend: filesystem >>>>>> state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR% >>>>>> state.checkpoints.dir: file:///storage/%CHECKPOINTDIR% >>>>>> taskmanager.network.numberOfBuffers: 5000 >>>>>> We are using docker based Flink cluster. >>>>>> WE submitted 36 jobs with the parallelism of 10. After all slots >>>>>> became full. Memory usage were increasing by the time and one by = one >>>>>> task managers start to die. And the exception was like this; >>>>>> Taskmanager1 log: >>>>>> Uncaught error from thread = [flink-akka.actor.default-dispatcher-17] >>>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled = for >>>>>> ActorSystem[flink] >>>>>> java.lang.NoClassDefFoundError: >>>>>> org/apache/kafka/common/metrics/stats/Rate$1 >>>>>> =E2=80=82=E2=80=82at >>>>>> org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93) >>>>>> =E2=80=82=E2=80=82at >>>>>> org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62) >>>>>> =E2=80=82=E2=80=82at >>>>>> = org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) >>>>>> =E2=80=82=E2=80=82at >>>>>> = org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) >>>>>> =E2=80=82=E2=80=82at >>>>>> = org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricW= rapper.getValue(KafkaMetricWrapper.java:35) >>>>>> =E2=80=82=E2=80=82at >>>>>> = org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricW= rapper.getValue(KafkaMetricWrapper.java:26) >>>>>> =E2=80=82=E2=80=82at >>>>>> = org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGau= ge(MetricDumpSerialization.java:213) >>>>>> =E2=80=82=E2=80=82at >>>>>> = org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(M= etricDumpSerialization.java:50) >>>>>> =E2=80=82=E2=80=82at >>>>>> = org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSe= rializer.serialize(MetricDumpSerialization.java:138) >>>>>> =E2=80=82=E2=80=82at >>>>>> = org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQ= ueryService.java:109) >>>>>> =E2=80=82=E2=80=82at >>>>>> = akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:= 167) >>>>>> =E2=80=82=E2=80=82at = akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>>> =E2=80=82=E2=80=82at = akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>>> =E2=80=82=E2=80=82at = akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>> =E2=80=82=E2=80=82at = akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>> =E2=80=82=E2=80=82at = akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>> =E2=80=82=E2=80=82at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>> =E2=80=82=E2=80=82at >>>>>> = akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractD= ispatcher.scala:397) >>>>>> =E2=80=82=E2=80=82at >>>>>> = scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>> =E2=80=82=E2=80=82at >>>>>> = scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java= :1339) >>>>>> =E2=80=82=E2=80=82at >>>>>> = scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>> =E2=80=82=E2=80=82at >>>>>> = scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.ja= va:107) >>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>> org.apache.kafka.common.metrics.stats.Rate$1 >>>>>> =E2=80=82=E2=80=82at = java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>> =E2=80=82=E2=80=82at = java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>> =E2=80=82=E2=80=82at = java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>> =E2=80=82=E2=80=82... 22 more >>>>>> Taskmanager2 log: >>>>>> Uncaught error from thread = [flink-akka.actor.default-dispatcher-17] >>>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled = for >>>>>> ActorSystem[flink] >>>>>> Java.lang.NoClassDefFoundError: >>>>>> = org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1 >>>>>> =E2=80=82=E2=80=82at >>>>>> = org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$Offs= etGauge.getValue(AbstractFetcher.java:492) >>>>>> =E2=80=82=E2=80=82at >>>>>> = org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$Offs= etGauge.getValue(AbstractFetcher.java:480) >>>>>> =E2=80=82=E2=80=82at >>>>>> = org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGau= ge(MetricDumpSerialization.java:213) >>>>>> =E2=80=82=E2=80=82at >>>>>> = org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(M= etricDumpSerialization.java:50) >>>>>> =E2=80=82=E2=80=82at >>>>>> = org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSe= rializer.serialize(MetricDumpSerialization.java:138) >>>>>> =E2=80=82=E2=80=82at >>>>>> = org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQ= ueryService.java:109) >>>>>> =E2=80=82=E2=80=82at >>>>>> = akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:= 167) >>>>>> =E2=80=82=E2=80=82at = akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>>> =E2=80=82=E2=80=82at = akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>>> =E2=80=82=E2=80=82at = akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>> =E2=80=82=E2=80=82at = akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>> =E2=80=82=E2=80=82at = akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>> =E2=80=82=E2=80=82at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>> =E2=80=82=E2=80=82at >>>>>> = akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractD= ispatcher.scala:397) >>>>>> =E2=80=82=E2=80=82at >>>>>> = scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>> =E2=80=82=E2=80=82at >>>>>> = scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java= :1339) >>>>>> =E2=80=82=E2=80=82at >>>>>> = scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>> =E2=80=82=E2=80=82at >>>>>> = scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.ja= va:107) >>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>> = org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1 >>>>>> =E2=80=82=E2=80=82at = java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>> =E2=80=82=E2=80=82at = java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>> =E2=80=82=E2=80=82at = java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>> =E2=80=82=E2=80=82... 18 more >>>>>> -Ebru >>>>> Hi Piotrek, >>>>> We attached the full log of the taskmanager1. >>>>> This may not be a dependency issue because until all of the task = slots is full, we didn't get any No Class Def Found exception, when = there is available memory jobs can run without exception for days. >>>>> Also there is Kafka Instance Already Exist exception in full log, = but this not relevant and doesn't effect jobs or task managers. >>>>> -Ebru >>> Hi, >>> Sorry we attached wrong log file. I've attached all task managers = and job manager's log. All task managers and job manager was = killed. > --Apple-Mail=_8241DE43-4FF9-4491-BB43-BD8AE94DB6DD Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Ebru,= Javier, Flavio:

I = tried to reproduce memory leak by submitting a job, that was generating = classes with random names. And indeed I have found one. Memory was = accumulating in `char[]` instances that belonged to = `java.lang.ClassLoader#parallelLockMap`. OldGen memory pool was growing = in size up to the point I got:

java.lang.OutOfMemoryError: Java heap space

This seems like an old = known =E2=80=9Cfeature=E2=80=9D of JDK:

Can any of you confirm = that this is the issue that you are experiencing? If not, I would really = need more help/information from you to track this down.

Piotrek

On 10 Nov 2017, at 15:12, =C3=87ET=C4=B0NKAYA = EBRU =C3=87ET=C4=B0NKAYA EBRU <b20926247@cs.hacettepe.edu.tr> wrote:

On = 2017-11-10 13:14, Piotr Nowojski wrote:
jobmanager1.log and taskmanager2.log are the = same. Can you also submit
files containing std output?
Piotrek
On = 10 Nov 2017, at 09:35, =C3=87ET=C4=B0NKAYA EBRU =C3=87ET=C4=B0NKAYA EBRU = <b20926247@cs.hacettepe.edu.tr> wrote:
On = 2017-11-10 11:04, Piotr Nowojski wrote:
Hi,
Thanks for the logs, however = I do not see before mentioned exceptions
in it. It ends = with java.lang.InterruptedException
Is it the correct log = file? Also, could you attach the std output file
of the = failing TaskManager?
Piotrek
On 10 Nov 2017, at 08:42, =C3=87ET=C4=B0NKAYA = EBRU =C3=87ET=C4=B0NKAYA EBRU <b20926247@cs.hacettepe.edu.tr> wrote:
On = 2017-11-09 20:08, Piotr Nowojski wrote:
Hi,
Could you attach full logs = from those task managers? At first glance I
don=E2=80=99t = see a connection between those exceptions and any memory issue
that you might had. It looks like a dependency issue in one = (some?
All?) of your jobs.
Did you build = your jars with -Pbuild-jar profile as described here:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/qui= ckstart/java_api_quickstart.html#build-project
?
If that doesn=E2=80=99t help. Can you binary search which job = is causing the
problem? There might be some Flink = incompatibility between different
versions and rebuilding = a job=E2=80=99s jar with a version matching to the
cluster = version might help.
Piotrek
On 9 Nov 2017, at 17:36, =C3=87ET=C4=B0NKAYA = EBRU =C3=87ET=C4=B0NKAYA EBRU
<b20926247@cs.hacettepe.edu.tr> wrote:
On = 2017-11-08 18:30, Piotr Nowojski wrote:
Btw, Ebru:
I don=E2=80=99t agree that the main suspect is = NetworkBufferPool. On your
screenshots it=E2=80=99s memory = consumption was reasonable and stable:
596MB
-> 602MB -> 597MB.
PoolThreadCache memory = usage ~120MB is also reasonable.
Do you experience any = problems, like Out Of Memory
errors/crashes/long
GC pauses? Or just JVM process is using more memory over = time? You
are
aware that JVM doesn=E2=80=99t = like to release memory back to OS once it
was
used? So increasing memory usage until hitting some limit = (for
example
JVM max heap size) is expected = behaviour.
Piotrek
On 8 Nov 2017, at 15:48, = Piotr Nowojski <piotr@data-artisans.com>
wrote:
I don=E2=80=99t know if this is relevant to this issue, but I = was
constantly getting failures trying to reproduce this = leak using your
Job, because you were using non = deterministic getKey function:
@Override
public Integer getKey(Integer event) {
Random = randomGen =3D new Random((new Date()).getTime());
return = randomGen.nextInt() % 8;
}
And quoting Java = doc of KeySelector:
"If invoked multiple times on the same = object, the returned key must
be the same.=E2=80=9D
I=E2=80=99m trying to reproduce this issue with following = job:
https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e= 0d3
Where IntegerSource is just an infinite source, = DisardingSink is
well just discarding incoming data. I=E2=80= =99m cancelling the job every 5
seconds and so far (after = ~15 minutes) my memory consumption is
stable, well below = maximum java heap size.
Piotrek
On 8 Nov = 2017, at 15:28, Javier Lopez <javier.lopez@zalando.de>
wrote:
Yes, I tested with just printing the = stream. But it could take a
lot of time to fail.
On Wednesday, 8 November 2017, Piotr Nowojski
<piotr@data-artisans.com> wrote:
Thanks = for quick answer.
So it will also fail after some time = with `fromElements` source
instead of Kafka, right?
Did you try it also without a Kafka producer?
Piotrek
On 8 Nov 2017, at 14:57, Javier Lopez = <javier.lopez@zalando.de>
wrote:
Hi,
You don't need data. With data it will die faster. I tested = as
well with a small data set, using the fromElements = source, but it
will take some time to die. It's better = with some data.
On 8 November 2017 at 14:54, Piotr = Nowojski
<piotr@data-artisans.com> wrote:
Hi,
Thanks for sharing this job.
Do= I need to feed some data to the Kafka to reproduce this
issue with your script?
Does this = OOM issue also happen when you are not using the
Kafka source/sink?
Piotrek
On 8 Nov 2017, at 14:08, Javier Lopez = <javier.lopez@zalando.de>
wrote:
Hi,
This is the test flink job we created to trigger this leak
https://gist.github.com/javieredo/c60= 52404dbe6cc602e99f4669a09f7d6
And this is the python = script we are using to execute the job
thousands of times to get the OOM = problem
https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a= 107
The cluster we used for this has this = configuration:
Instance type: t2.large
Number = of workers: 2
HeapMemory: 5500
Number of = task slots per node: 4
TaskMangMemFraction: 0.5
NumberOfNetworkBuffers: 2000
We have tried = several things, increasing the heap, reducing the
heap, more memory fraction, changes = this value in the
taskmanager.sh = "TM_MAX_OFFHEAP_SIZE=3D"2G"; and nothing seems to
work.
Thanks for your help.
On 8 November 2017 at = 13:26, =C3=87ET=C4=B0NKAYA EBRU =C3=87ET=C4=B0NKAYA EBRU
<b20926247@cs.hacettepe.edu.tr>= wrote:
On 2017-11-08 = 15:20, Piotr Nowojski wrote:
Hi Ebru and Javier,
Yes, if you could share this example job it would be = helpful.
Ebru: could you explain in a little more details = how does
your Job(s)
look like? Could you post some code? If you are = just using
maps and
filters there shouldn=E2=80=99t be any network = transfers involved,
aside
from Source and Sink = functions.
Piotrek
On 8 Nov 2017, at 12:54, = ebru
<b20926247@cs.hacettepe.edu.tr> = wrote:
Hi Javier,
It would be helpful if you share your test job with us.
Which configurations did you try?
-Ebru
On 8 Nov 2017, at 14:43, Javier Lopez
<javier.lopez@zalando.de>
wrote:
Hi,
We have been facing a similar problem. We = have tried some
different
configurations, as = proposed in other email thread by Flavio
and
Kien, but it didn't = work. We have a workaround similar to
the = one
that Flavio has, = we restart the taskmanagers once they reach
a
memory threshold. We created a small test to remove all of
our
dependencies and leave only flink native libraries. This
test reads
data from a Kafka topic and writes it back to another = topic
in
Kafka. We cancel the job and start another = every 5 seconds.
After
~30 minutes of doing = this process, the cluster reaches the
OS = memory
limit and = dies.
Currently, we have a test cluster with 8 workers and = 8 task
slots
per node. We have one job that uses 56 slots, = and we cannot
execute
that job 5 times in a row because the whole = cluster dies. If
you
want, we can publish our test job.
Regards,
On 8 November 2017 at 11:20, Aljoscha = Krettek
<aljoscha@apache.org>
wrote:
@Nico= & @Piotr Could you please have a look at this? You
both
recently worked on the network stack and might be most
familiar with
this.
On 8. Nov 2017, at 10:25, = Flavio Pompermaier
<pompermaier@okkam.it>
wrote:
We = also have the same problem in production. At the moment
the
solution is to restart the entire Flink cluster after = every
job..
We've tried to reproduce this problem with a = test (see
https://issues.apache.org/jira/browse/FLINK-7845 = [1]) but we
don't
know whether the error produced by the test and = the leak are
correlated..
Best,
Flavio
On Wed, Nov 8, 2017 at 9:51 AM, = =C3=87ET=C4=B0NKAYA EBRU =C3=87ET=C4=B0NKAYA
EBRU
<b20926247@cs.hacettepe.edu.tr> wrote:
On = 2017-11-07 16:53, Ufuk Celebi wrote:
Do you use any = windowing? If yes, could you please share
that = code?
If
there is no stateful operation at all, it's strange where
the list
state instances are coming from.
On Tue, Nov 7, = 2017 at 2:35 PM, ebru
<b20926247@cs.hacettepe.edu.tr>
wrote:
Hi = Ufuk,
We don=E2=80=99t explicitly define any state = descriptor. We only
use map
and filters
operator. We thought that gc handle clearing the flink=E2=80=99= s
internal
states.
So how can we manage the = memory if it is always increasing?
- Ebru
On = 7 Nov 2017, at 16:23, Ufuk Celebi <uce@apache.org> wrote:
Hey Ebru, the memory usage might be increasing as long as = a
job is
running.
This is expected (also = in the case of multiple running
jobs). The
screenshots are not = helpful in that regard. :-(
What kind of stateful = operations are you using? Depending on
your
use case,
you have to manually call `clear()` on the state instance = in
order
to
release the managed state.
Best,
Ufuk
On Tue, Nov 7, 2017 at = 12:43 PM, ebru
<b20926247@cs.hacettepe.edu.tr> = wrote:
Begin forwarded message:
From: ebru = <b20926247@cs.hacettepe.edu.tr>
Subject: Re: Flink = memory leak
Date: 7 November 2017 at 14:09:17 GMT+3
To: Ufuk Celebi <uce@apache.org>
Hi = Ufuk,
There are there snapshots of htop output.
1. snapshot is initial state.
2. snapshot is = after submitted one job.
3. Snapshot is the output of the = one job with 15000 EPS. And
the
memory
usage= is always increasing over time.
<1.png><2.png><3.png>
On 7 = Nov 2017, at 13:34, Ufuk Celebi <uce@apache.org> wrote:
Hey Ebru,
let me pull in Aljoscha (CC'd) who = might have an idea what's
causing
this.
Since = multiple jobs are running, it will be hard to
understand to
which job the state descriptors from the heap = snapshot
belong to.
- Is it possible to isolate the problem and = reproduce the
behaviour
with only a single = job?
=E2=80=93 Ufuk
On Tue, Nov 7, 2017 at = 10:27 AM, =C3=87ET=C4=B0NKAYA EBRU
=C3=87ET=C4=B0= NKAYA EBRU
<b20926247@cs.hacettepe.edu.tr> wrote:
Hi,
We are using Flink 1.3.1 in production, we = have one job
manager and
3 task
managers in standalone mode. Recently, we've noticed that = we
have
memory
related problems. We use docker = container to serve Flink
cluster. We
have
300 = slots and 20 jobs are running with parallelism of 10.
Also the
job
count
may be change over = time. Taskmanager memory usage always
increases.
After
job cancelation this memory usage = doesn't decrease. We've
tried to
investigate the problem = and we've got the task manager jvm
heap
snapshot.
According to the jam heap analysis, possible memory leak = was
Flink
list
state descriptor. But we are = not sure that is the cause of
our
memory
problem. How can we solve the problem?
We have = two types of Flink job. One has no state full
operator
contains only maps and filters and the other has time = window
with
count trigger.
* We've analysed = the jvm heaps again in different
conditions. = First
we analysed the = snapshot when no flink jobs running on
cluster.= (image
1)
* Then, we analysed the jvm heap snapshot when the flink = job
that has
no state full operator is running. And = according to the
results, leak
suspect was = NetworkBufferPool (image 2)
*   Last analys, = there were both two types of jobs running
and = leak
suspect was = again NetworkBufferPool. (image 3)
In our system jobs are = regularly cancelled and resubmitted so
we
noticed that when job is = submitted some amount of memory
allocated = and
after cancelation = this allocated memory never freed. So over
time
memory usage is always increasing and exceeded the limits.
Links:
------
[1] = https://issues.apache.org/jira/browse/FLINK-7845
Hi = Piotr,
There are two types of jobs.
In = first, we use Kafka source and Kafka sink, there isn't any
window operator.
In second job, we use Kafka source, filesystem sink and
elastic search sink and window operator for = buffering.
Hi Piotrek,
Thanks for your = reply.
We've tested our link cluster again. We have 360 = slots, and our
cluster configuration is like this;
jobmanager.rpc.address: %JOBMANAGER%
jobmanager.rpc.port: 6123
jobmanager.heap.mb: = 1536
taskmanager.heap.mb: 1536
taskmanager.numberOfTaskSlots: 120
taskmanager.memory.preallocate: false
parallelism.default: 1
jobmanager.web.port: = 8081
state.backend: filesystem
state.backend.fs.checkpointdir: = file:///storage/%CHECKPOINTDIR%
state.checkpoints.dir: = file:///storage/%CHECKPOINTDIR%
taskmanager.network.numberOfBuffers: 5000
We = are using docker based Flink cluster.
WE submitted 36 jobs = with the parallelism of 10. After all slots
became full. = Memory usage were increasing by the time and one by one
task= managers start to die. And the exception was like this;
Taskmanager1 log:
Uncaught error from thread = [flink-akka.actor.default-dispatcher-17]
shutting down JVM = since 'akka.jvm-exit-on-fatal-error' is enabled for
ActorSystem[flink]
java.lang.NoClassDefFoundError:
org/apache/kafka/common/metrics/stats/Rate$1
=E2=80=82=E2=80=82at
org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93= )
=E2=80=82=E2=80=82at
org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62= )
=E2=80=82=E2=80=82at
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.j= ava:61)
=E2=80=82=E2=80=82at
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.j= ava:52)
=E2=80=82=E2=80=82at
org.apache.flink.streaming.connectors.kafka.internals.metrics.K= afkaMetricWrapper.getValue(KafkaMetricWrapper.java:35)
=E2=80=82=E2=80=82at
org.apache.flink.streaming.connectors.kafka.internals.metrics.K= afkaMetricWrapper.getValue(KafkaMetricWrapper.java:26)
=E2=80=82=E2=80=82at
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.s= erializeGauge(MetricDumpSerialization.java:213)
=E2=80=82=E2= =80=82at
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.a= ccess$200(MetricDumpSerialization.java:50)
=E2=80=82=E2=80=82= at
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$M= etricDumpSerializer.serialize(MetricDumpSerialization.java:138)
=E2=80=82=E2=80=82at
org.apache.flink.runtime.metrics.dump.MetricQueryService.onRece= ive(MetricQueryService.java:109)
=E2=80=82=E2=80=82at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedA= ctor.scala:167)
=E2=80=82=E2=80=82at = akka.actor.Actor$class.aroundReceive(Actor.scala:467)
=E2=80=82=E2=80=82at = akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
=E2=80=82=E2=80=82at = akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
=E2=80=82=E2=80=82at = akka.actor.ActorCell.invoke(ActorCell.scala:487)
=E2=80=82=E2= =80=82at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
=E2=80=82=E2=80=82at = akka.dispatch.Mailbox.run(Mailbox.scala:220)
=E2=80=82=E2=80= =82at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exe= c(AbstractDispatcher.scala:397)
=E2=80=82=E2=80=82at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java= :260)
=E2=80=82=E2=80=82at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJo= inPool.java:1339)
=E2=80=82=E2=80=82at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.j= ava:1979)
=E2=80=82=E2=80=82at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWork= erThread.java:107)
Caused by: = java.lang.ClassNotFoundException:
org.apache.kafka.common.metrics.stats.Rate$1
=E2=80=82=E2=80=82at = java.net.URLClassLoader.findClass(URLClassLoader.java:381)
=E2=80=82=E2=80=82at = java.lang.ClassLoader.loadClass(ClassLoader.java:424)
=E2=80=82=E2=80=82at = java.lang.ClassLoader.loadClass(ClassLoader.java:357)
=E2=80=82=E2=80=82... 22 more
Taskmanager2 = log:
Uncaught error from thread = [flink-akka.actor.default-dispatcher-17]
shutting down JVM = since 'akka.jvm-exit-on-fatal-error' is enabled for
ActorSystem[flink]
Java.lang.NoClassDefFoundError:
org/apache/flink/streaming/connectors/kafka/internals/AbstractF= etcher$1
=E2=80=82=E2=80=82at
org.apache.flink.streaming.connectors.kafka.internals.AbstractF= etcher$OffsetGauge.getValue(AbstractFetcher.java:492)
=E2=80=82=E2=80=82at
org.apache.flink.streaming.connectors.kafka.internals.AbstractF= etcher$OffsetGauge.getValue(AbstractFetcher.java:480)
=E2=80=82=E2=80=82at
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.s= erializeGauge(MetricDumpSerialization.java:213)
=E2=80=82=E2= =80=82at
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.a= ccess$200(MetricDumpSerialization.java:50)
=E2=80=82=E2=80=82= at
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$M= etricDumpSerializer.serialize(MetricDumpSerialization.java:138)
=E2=80=82=E2=80=82at
org.apache.flink.runtime.metrics.dump.MetricQueryService.onRece= ive(MetricQueryService.java:109)
=E2=80=82=E2=80=82at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedA= ctor.scala:167)
=E2=80=82=E2=80=82at = akka.actor.Actor$class.aroundReceive(Actor.scala:467)
=E2=80=82=E2=80=82at = akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
=E2=80=82=E2=80=82at = akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
=E2=80=82=E2=80=82at = akka.actor.ActorCell.invoke(ActorCell.scala:487)
=E2=80=82=E2= =80=82at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
=E2=80=82=E2=80=82at = akka.dispatch.Mailbox.run(Mailbox.scala:220)
=E2=80=82=E2=80= =82at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exe= c(AbstractDispatcher.scala:397)
=E2=80=82=E2=80=82at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java= :260)
=E2=80=82=E2=80=82at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJo= inPool.java:1339)
=E2=80=82=E2=80=82at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.j= ava:1979)
=E2=80=82=E2=80=82at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWork= erThread.java:107)
Caused by: = java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.internals.AbstractF= etcher$1
=E2=80=82=E2=80=82at = java.net.URLClassLoader.findClass(URLClassLoader.java:381)
=E2=80=82=E2=80=82at = java.lang.ClassLoader.loadClass(ClassLoader.java:424)
=E2=80=82=E2=80=82at = java.lang.ClassLoader.loadClass(ClassLoader.java:357)
=E2=80=82=E2=80=82... 18 more
-Ebru
Hi Piotrek,
We attached the full = log of the taskmanager1.
This may not be a dependency = issue because until all of the task slots is full, we didn't get any No = Class Def Found exception, when there is available memory jobs can run = without exception for days.
Also there is Kafka Instance = Already Exist exception in full log, but this not relevant and doesn't = effect jobs or task managers.
-Ebru<taskmanager1.log.zip>
Hi,
Sorry we attached = wrong log file. I've attached all task managers and job manager's log. = All task managers and job manager was killed.<logs.zip>
<logs2-2.zip&= gt;

= --Apple-Mail=_8241DE43-4FF9-4491-BB43-BD8AE94DB6DD--