flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Flink memory leak
Date Tue, 14 Nov 2017 15:08:53 GMT
What should we do to confirm it? Do you have any github repo start from?

On Tue, Nov 14, 2017 at 4:02 PM, Piotr Nowojski <piotr@data-artisans.com>
wrote:

> Ebru, Javier, Flavio:
>
> I tried to reproduce memory leak by submitting a job, that was generating
> classes with random names. And indeed I have found one. Memory was
> accumulating in `char[]` instances that belonged to
> `java.lang.ClassLoader#parallelLockMap`. OldGen memory pool was growing
> in size up to the point I got:
>
> java.lang.OutOfMemoryError: Java heap space
>
> This seems like an old known “feature” of JDK:
> https://bugs.openjdk.java.net/browse/JDK-8037342
>
> Can any of you confirm that this is the issue that you are experiencing?
> If not, I would really need more help/information from you to track this
> down.
>
> Piotrek
>
> On 10 Nov 2017, at 15:12, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <
> b20926247@cs.hacettepe.edu.tr> wrote:
>
> On 2017-11-10 13:14, Piotr Nowojski wrote:
>
> jobmanager1.log and taskmanager2.log are the same. Can you also submit
> files containing std output?
> Piotrek
>
> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <
> b20926247@cs.hacettepe.edu.tr> wrote:
> On 2017-11-10 11:04, Piotr Nowojski wrote:
>
> Hi,
> Thanks for the logs, however I do not see before mentioned exceptions
> in it. It ends with java.lang.InterruptedException
> Is it the correct log file? Also, could you attach the std output file
> of the failing TaskManager?
> Piotrek
>
> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <
> b20926247@cs.hacettepe.edu.tr> wrote:
> On 2017-11-09 20:08, Piotr Nowojski wrote:
>
> Hi,
> Could you attach full logs from those task managers? At first glance I
> don’t see a connection between those exceptions and any memory issue
> that you might had. It looks like a dependency issue in one (some?
> All?) of your jobs.
> Did you build your jars with -Pbuild-jar profile as described here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> quickstart/java_api_quickstart.html#build-project
> ?
> If that doesn’t help. Can you binary search which job is causing the
> problem? There might be some Flink incompatibility between different
> versions and rebuilding a job’s jar with a version matching to the
> cluster version might help.
> Piotrek
>
> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
> <b20926247@cs.hacettepe.edu.tr> wrote:
> On 2017-11-08 18:30, Piotr Nowojski wrote:
> Btw, Ebru:
> I don’t agree that the main suspect is NetworkBufferPool. On your
> screenshots it’s memory consumption was reasonable and stable:
> 596MB
> -> 602MB -> 597MB.
> PoolThreadCache memory usage ~120MB is also reasonable.
> Do you experience any problems, like Out Of Memory
> errors/crashes/long
> GC pauses? Or just JVM process is using more memory over time? You
> are
> aware that JVM doesn’t like to release memory back to OS once it
> was
> used? So increasing memory usage until hitting some limit (for
> example
> JVM max heap size) is expected behaviour.
> Piotrek
> On 8 Nov 2017, at 15:48, Piotr Nowojski <piotr@data-artisans.com>
> wrote:
> I don’t know if this is relevant to this issue, but I was
> constantly getting failures trying to reproduce this leak using your
> Job, because you were using non deterministic getKey function:
> @Override
> public Integer getKey(Integer event) {
> Random randomGen = new Random((new Date()).getTime());
> return randomGen.nextInt() % 8;
> }
> And quoting Java doc of KeySelector:
> "If invoked multiple times on the same object, the returned key must
> be the same.”
> I’m trying to reproduce this issue with following job:
> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3
> Where IntegerSource is just an infinite source, DisardingSink is
> well just discarding incoming data. I’m cancelling the job every 5
> seconds and so far (after ~15 minutes) my memory consumption is
> stable, well below maximum java heap size.
> Piotrek
> On 8 Nov 2017, at 15:28, Javier Lopez <javier.lopez@zalando.de>
> wrote:
> Yes, I tested with just printing the stream. But it could take a
> lot of time to fail.
> On Wednesday, 8 November 2017, Piotr Nowojski
> <piotr@data-artisans.com> wrote:
> Thanks for quick answer.
> So it will also fail after some time with `fromElements` source
> instead of Kafka, right?
> Did you try it also without a Kafka producer?
> Piotrek
> On 8 Nov 2017, at 14:57, Javier Lopez <javier.lopez@zalando.de>
> wrote:
> Hi,
> You don't need data. With data it will die faster. I tested as
> well with a small data set, using the fromElements source, but it
> will take some time to die. It's better with some data.
> On 8 November 2017 at 14:54, Piotr Nowojski
> <piotr@data-artisans.com> wrote:
> Hi,
> Thanks for sharing this job.
> Do I need to feed some data to the Kafka to reproduce this
>
> issue with your script?
>
> Does this OOM issue also happen when you are not using the
>
> Kafka source/sink?
>
> Piotrek
> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lopez@zalando.de>
>
> wrote:
>
> Hi,
> This is the test flink job we created to trigger this leak
>
> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
>
> And this is the python script we are using to execute the job
>
> thousands of times to get the OOM problem
> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107
>
> The cluster we used for this has this configuration:
> Instance type: t2.large
> Number of workers: 2
> HeapMemory: 5500
> Number of task slots per node: 4
> TaskMangMemFraction: 0.5
> NumberOfNetworkBuffers: 2000
> We have tried several things, increasing the heap, reducing the
>
> heap, more memory fraction, changes this value in the
> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to
> work.
>
> Thanks for your help.
> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>
> <b20926247@cs.hacettepe.edu.tr> wrote:
>
> On 2017-11-08 15:20, Piotr Nowojski wrote:
> Hi Ebru and Javier,
> Yes, if you could share this example job it would be helpful.
> Ebru: could you explain in a little more details how does
>
> your Job(s)
>
> look like? Could you post some code? If you are just using
>
> maps and
>
> filters there shouldn’t be any network transfers involved,
>
> aside
>
> from Source and Sink functions.
> Piotrek
> On 8 Nov 2017, at 12:54, ebru
>
> <b20926247@cs.hacettepe.edu.tr> wrote:
>
> Hi Javier,
> It would be helpful if you share your test job with us.
> Which configurations did you try?
> -Ebru
> On 8 Nov 2017, at 14:43, Javier Lopez
>
> <javier.lopez@zalando.de>
>
> wrote:
> Hi,
> We have been facing a similar problem. We have tried some
>
> different
>
> configurations, as proposed in other email thread by Flavio
>
> and
>
> Kien, but it didn't work. We have a workaround similar to
>
> the one
>
> that Flavio has, we restart the taskmanagers once they reach
>
> a
>
> memory threshold. We created a small test to remove all of
>
> our
>
> dependencies and leave only flink native libraries. This
>
> test reads
>
> data from a Kafka topic and writes it back to another topic
>
> in
>
> Kafka. We cancel the job and start another every 5 seconds.
>
> After
>
> ~30 minutes of doing this process, the cluster reaches the
>
> OS memory
>
> limit and dies.
> Currently, we have a test cluster with 8 workers and 8 task
>
> slots
>
> per node. We have one job that uses 56 slots, and we cannot
>
> execute
>
> that job 5 times in a row because the whole cluster dies. If
>
> you
>
> want, we can publish our test job.
> Regards,
> On 8 November 2017 at 11:20, Aljoscha Krettek
>
> <aljoscha@apache.org>
>
> wrote:
> @Nico & @Piotr Could you please have a look at this? You
>
> both
>
> recently worked on the network stack and might be most
>
> familiar with
>
> this.
> On 8. Nov 2017, at 10:25, Flavio Pompermaier
>
> <pompermaier@okkam.it>
>
> wrote:
> We also have the same problem in production. At the moment
>
> the
>
> solution is to restart the entire Flink cluster after every
>
> job..
>
> We've tried to reproduce this problem with a test (see
> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we
>
> don't
>
> know whether the error produced by the test and the leak are
> correlated..
> Best,
> Flavio
> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA
>
> EBRU
>
> <b20926247@cs.hacettepe.edu.tr> wrote:
> On 2017-11-07 16:53, Ufuk Celebi wrote:
> Do you use any windowing? If yes, could you please share
>
> that code?
>
> If
> there is no stateful operation at all, it's strange where
>
> the list
>
> state instances are coming from.
> On Tue, Nov 7, 2017 at 2:35 PM, ebru
>
> <b20926247@cs.hacettepe.edu.tr>
>
> wrote:
> Hi Ufuk,
> We don’t explicitly define any state descriptor. We only
>
> use map
>
> and filters
> operator. We thought that gc handle clearing the flink’s
>
> internal
>
> states.
> So how can we manage the memory if it is always increasing?
> - Ebru
> On 7 Nov 2017, at 16:23, Ufuk Celebi <uce@apache.org> wrote:
> Hey Ebru, the memory usage might be increasing as long as a
>
> job is
>
> running.
> This is expected (also in the case of multiple running
>
> jobs). The
>
> screenshots are not helpful in that regard. :-(
> What kind of stateful operations are you using? Depending on
>
> your
>
> use case,
> you have to manually call `clear()` on the state instance in
>
> order
>
> to
> release the managed state.
> Best,
> Ufuk
> On Tue, Nov 7, 2017 at 12:43 PM, ebru
> <b20926247@cs.hacettepe.edu.tr> wrote:
> Begin forwarded message:
> From: ebru <b20926247@cs.hacettepe.edu.tr>
> Subject: Re: Flink memory leak
> Date: 7 November 2017 at 14:09:17 GMT+3
> To: Ufuk Celebi <uce@apache.org>
> Hi Ufuk,
> There are there snapshots of htop output.
> 1. snapshot is initial state.
> 2. snapshot is after submitted one job.
> 3. Snapshot is the output of the one job with 15000 EPS. And
>
> the
>
> memory
> usage is always increasing over time.
> <1.png><2.png><3.png>
> On 7 Nov 2017, at 13:34, Ufuk Celebi <uce@apache.org> wrote:
> Hey Ebru,
> let me pull in Aljoscha (CC'd) who might have an idea what's
>
> causing
>
> this.
> Since multiple jobs are running, it will be hard to
>
> understand to
>
> which job the state descriptors from the heap snapshot
>
> belong to.
>
> - Is it possible to isolate the problem and reproduce the
>
> behaviour
>
> with only a single job?
> – Ufuk
> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU
>
> ÇETİNKAYA EBRU
>
> <b20926247@cs.hacettepe.edu.tr> wrote:
> Hi,
> We are using Flink 1.3.1 in production, we have one job
>
> manager and
>
> 3 task
> managers in standalone mode. Recently, we've noticed that we
>
> have
>
> memory
> related problems. We use docker container to serve Flink
>
> cluster. We
>
> have
> 300 slots and 20 jobs are running with parallelism of 10.
>
> Also the
>
> job
> count
> may be change over time. Taskmanager memory usage always
>
> increases.
>
> After
> job cancelation this memory usage doesn't decrease. We've
>
> tried to
>
> investigate the problem and we've got the task manager jvm
>
> heap
>
> snapshot.
> According to the jam heap analysis, possible memory leak was
>
> Flink
>
> list
> state descriptor. But we are not sure that is the cause of
>
> our
>
> memory
> problem. How can we solve the problem?
> We have two types of Flink job. One has no state full
>
> operator
>
> contains only maps and filters and the other has time window
>
> with
>
> count trigger.
> * We've analysed the jvm heaps again in different
>
> conditions. First
>
> we analysed the snapshot when no flink jobs running on
>
> cluster. (image
>
> 1)
> * Then, we analysed the jvm heap snapshot when the flink job
>
> that has
>
> no state full operator is running. And according to the
>
> results, leak
>
> suspect was NetworkBufferPool (image 2)
> *   Last analys, there were both two types of jobs running
>
> and leak
>
> suspect was again NetworkBufferPool. (image 3)
> In our system jobs are regularly cancelled and resubmitted so
>
> we
>
> noticed that when job is submitted some amount of memory
>
> allocated and
>
> after cancelation this allocated memory never freed. So over
>
> time
>
> memory usage is always increasing and exceeded the limits.
>
> Links:
> ------
> [1] https://issues.apache.org/jira/browse/FLINK-7845
> Hi Piotr,
> There are two types of jobs.
> In first, we use Kafka source and Kafka sink, there isn't any
> window operator.
>
> In second job, we use Kafka source, filesystem sink and
>
> elastic search sink and window operator for buffering.
> Hi Piotrek,
> Thanks for your reply.
> We've tested our link cluster again. We have 360 slots, and our
> cluster configuration is like this;
> jobmanager.rpc.address: %JOBMANAGER%
> jobmanager.rpc.port: 6123
> jobmanager.heap.mb: 1536
> taskmanager.heap.mb: 1536
> taskmanager.numberOfTaskSlots: 120
> taskmanager.memory.preallocate: false
> parallelism.default: 1
> jobmanager.web.port: 8081
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR%
> state.checkpoints.dir: file:///storage/%CHECKPOINTDIR%
> taskmanager.network.numberOfBuffers: 5000
> We are using docker based Flink cluster.
> WE submitted 36 jobs with the parallelism of 10. After all slots
> became full. Memory usage were increasing by the time and one by one
> task managers start to die. And the exception was like this;
> Taskmanager1 log:
> Uncaught error from thread [flink-akka.actor.default-dispatcher-17]
> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for
> ActorSystem[flink]
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/metrics/stats/Rate$1
> at
> org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93)
> at
> org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62)
> at
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
> at
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
> at
> org.apache.flink.streaming.connectors.kafka.internals.metric
> s.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35)
> at
> org.apache.flink.streaming.connectors.kafka.internals.metric
> s.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26)
> at
> org.apache.flink.runtime.metrics.dump.MetricDumpSerializatio
> n.serializeGauge(MetricDumpSerialization.java:213)
> at
> org.apache.flink.runtime.metrics.dump.MetricDumpSerializatio
> n.access$200(MetricDumpSerialization.java:50)
> at
> org.apache.flink.runtime.metrics.dump.MetricDumpSerializatio
> n$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138)
> at
> org.apache.flink.runtime.metrics.dump.MetricQueryService.
> onReceive(MetricQueryService.java:109)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(Untyp
> edActor.scala:167)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.
> exec(AbstractDispatcher.scala:397)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
> ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
> orkerThread.java:107)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.kafka.common.metrics.stats.Rate$1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 22 more
> Taskmanager2 log:
> Uncaught error from thread [flink-akka.actor.default-dispatcher-17]
> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for
> ActorSystem[flink]
> Java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1
> at
> org.apache.flink.streaming.connectors.kafka.internals.Abstra
> ctFetcher$OffsetGauge.getValue(AbstractFetcher.java:492)
> at
> org.apache.flink.streaming.connectors.kafka.internals.Abstra
> ctFetcher$OffsetGauge.getValue(AbstractFetcher.java:480)
> at
> org.apache.flink.runtime.metrics.dump.MetricDumpSerializatio
> n.serializeGauge(MetricDumpSerialization.java:213)
> at
> org.apache.flink.runtime.metrics.dump.MetricDumpSerializatio
> n.access$200(MetricDumpSerialization.java:50)
> at
> org.apache.flink.runtime.metrics.dump.MetricDumpSerializatio
> n$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138)
> at
> org.apache.flink.runtime.metrics.dump.MetricQueryService.
> onReceive(MetricQueryService.java:109)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(Untyp
> edActor.scala:167)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.
> exec(AbstractDispatcher.scala:397)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
> ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
> orkerThread.java:107)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 18 more
> -Ebru
>
> Hi Piotrek,
> We attached the full log of the taskmanager1.
> This may not be a dependency issue because until all of the task slots is
> full, we didn't get any No Class Def Found exception, when there is
> available memory jobs can run without exception for days.
> Also there is Kafka Instance Already Exist exception in full log, but this
> not relevant and doesn't effect jobs or task managers.
> -Ebru<taskmanager1.log.zip>
>
> Hi,
> Sorry we attached wrong log file. I've attached all task managers and job
> manager's log. All task managers and job manager was killed.<logs.zip>
>
> <logs2-2.zip>
>
>
>


-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809 <+39%200461%20041809>

Mime
View raw message