flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daniel Santos <dsan...@cryptolab.net>
Subject Re: JVM Non Heap Memory
Date Mon, 05 Dec 2016 15:44:23 GMT
Hello,

Thank you all for the kindly reply.

I've got the general idea. I am using version flink's 1.1.3.

So it seems the fix of Meter's won't make it to 1.1.4 ?

Best Regards,

Daniel Santos


On 12/05/2016 01:28 PM, Chesnay Schepler wrote:
> We don't have to include it in 1.1.4 since Meter's do not exist in 
> 1.1; my bad for tagging it in JIRA for 1.1.4.
>
> On 05.12.2016 14:18, Ufuk Celebi wrote:
>> Just to note that the bug mentioned by Chesnay does not invalidate 
>> Stefan's comments. ;-)
>>
>> Chesnay's issue is here: 
>> https://issues.apache.org/jira/browse/FLINK-5261
>>
>> I added an issue to improve the documentation about cancellation 
>> (https://issues.apache.org/jira/browse/FLINK-5260).
>>
>> Which version of Flink are you using? Chesnay's fix will make it into 
>> the upcoming 1.1.4 release.
>>
>>
>> On 5 December 2016 at 14:04:49, Chesnay Schepler (chesnay@apache.org) 
>> wrote:
>>> Hello Daniel,
>>>   I'm afraid you stumbled upon a bug in Flink. Meters were not properly
>>> cleaned up, causing the underlying dropwizard meter update threads to
>>> not be shutdown either.
>>>   I've opened a JIRA
>>> and will open a PR soon.
>>>   Thank your for reporting this issue.
>>>   Regards,
>>> Chesnay
>>>   On 05.12.2016 12:05, Stefan Richter wrote:
>>>> Hi Daniel,
>>>>
>>>> the behaviour you observe looks like some threads are not canceled.
>>>> Thread cancelation in Flink (and Java in general) is always
>>>> cooperative, where cooperative means that the thread you want to
>>>> cancel should somehow check cancelation and react to it. Sometimes
>>>> this also requires some effort from the client that wants to cancel a
>>>> thread. So if you implement e.g. custom operators or functions with
>>>> aerospike, you must ensure that they a) react on cancelation and b)
>>>> cleanup their resources. If you do not consider this, your aerospike
>>>> client might stay in a blocking call forever, in particular blocking
>>>> IO calls are prone to this. What you need to ensure is that
>>>> cancelation from the clients includes closing IO resources such as
>>>> streams to unblock the thread and allow for termination. This means
>>>> that you need your code must (to a certain degree) actively
>>>> participate in Flink's task lifecycle. In Flink 1.2 we introduce a
>>>> feature called CloseableRegistry, which makes participating in this
>>>> lifecycle easier w.r.t. closing resources. For the time being, you
>>>> should check that Flink’s task cancelation also causes your code to
>>>> close the aerospike client and check cancelation flags.
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>>> Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I have done some threads checking and dumps. And I have disabled the
>>>>> checkpointing.
>>>>>
>>>>> Here are my findings.
>>>>>
>>>>> I did a thread dump a few hours after I booted up the whole cluster.
>>>>> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )
>>>>>
>>>>> The dump shows that most threads are of 3 sources.
>>>>> *
>>>>> **OutputFlusher --- 634 -- Sleeping State*
>>>>>
>>>>> "OutputFlusher" - Thread t@4758
>>>>> java.lang.Thread.State: TIMED_WAITING
>>>>> at java.lang.Thread.sleep(Native Method)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)

>>>>>
>>>>>
>>>>> Locked ownable synchronizers:
>>>>> - None
>>>>> *
>>>>> **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics
>>>>> being used ) -- Parked State*
>>>>>
>>>>> "metrics-meter-tick-thread-1" - Thread t@29024
>>>>> java.lang.Thread.State: TIMED_WAITING
>>>>> at sun.misc.Unsafe.park(Native Method)
>>>>> - parking to wait for (a
>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

>>>>>
>>>>> at
>>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)

>>>>>
>>>>> at
>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)

>>>>>
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)

>>>>>
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)

>>>>>
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)

>>>>>
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)

>>>>>
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

>>>>>
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> Locked ownable synchronizers:
>>>>> - None
>>>>> *
>>>>> *
>>>>>
>>>>> *tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
>>>>> *
>>>>>
>>>>> "tend" - Thread t@29011
>>>>> java.lang.Thread.State: TIMED_WAITING
>>>>> at java.lang.Thread.sleep(Native Method)
>>>>> at com.aerospike.client.util.Util.sleep(Util.java:38)
>>>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> Locked ownable synchronizers:
>>>>> - None
>>>>>
>>>>>
>>>>> I have 2 streaming jobs and a batch Job that runs once in a while.
>>>>>
>>>>> Streaming job A runs with a parallel of 2 and runs Aerospike only in
>>>>> RichSink .
>>>>>
>>>>> Streaming job B runs with a parallel of 24 and runs Aerospike in
>>>>> RichFilterFunction / RichMapFunction with open and close methods, in
>>>>> order to open and close the client.
>>>>>
>>>>> Batch Job runs Aerospike Client in RichFilterFunction /
>>>>> RichMapFunction with open and close methods in order to open and
>>>>> close the client.
>>>>>
>>>>> Next thing I cancelled all the streaming jobs @5/12/2016 and checked
>>>>> the threads and the JVM non-heap usage.
>>>>>
>>>>> JVM non-heap usage reaches 3GB, threads go down, but some still
>>>>> linger around and they are the following.
>>>>>
>>>>> *Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being
>>>>> used ) *
>>>>>
>>>>> "metrics-meter-tick-thread-1" - Thread t@29024
>>>>> java.lang.Thread.State: TIMED_WAITING
>>>>> at sun.misc.Unsafe.park(Native Method)
>>>>> - parking to wait for (a
>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

>>>>>
>>>>> at
>>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)

>>>>>
>>>>> at
>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)

>>>>>
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)

>>>>>
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)

>>>>>
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)

>>>>>
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)

>>>>>
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

>>>>>
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> Locked ownable synchronizers:
>>>>> - None
>>>>>
>>>>> *
>>>>> *
>>>>>
>>>>> *tend -- 432**( Aerospike Client Thread )*
>>>>>
>>>>>
>>>>> "tend" - Thread t@29011
>>>>> java.lang.Thread.State: TIMED_WAITING
>>>>> at java.lang.Thread.sleep(Native Method)
>>>>> at com.aerospike.client.util.Util.sleep(Util.java:38)
>>>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> Locked ownable synchronizers:
>>>>> - None
>>>>>
>>>>>
>>>>> Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) .
>>>>> So I have 1220 threads that I believe that sould be dead and not
>>>>> running, since I have no jobs running at all.
>>>>>
>>>>> And the JVM Non-HEAP usage doesn't decreases at all, after removing
>>>>> every job.
>>>>>
>>>>>
>>>>> Why the hell metrics grow to no end ?
>>>>>
>>>>> I am using the following libs for metrics :
>>>>>
>>>>> - metrics-graphite-3.1.0.jar
>>>>>
>>>>> - metrics-core-3.1.0.jar
>>>>>
>>>>> - flink-metrics-dropwizard-1.1.3.jar
>>>>>
>>>>> - flink-metrics-graphite-1.1.3.jar
>>>>>
>>>>> And the config for reporter is :
>>>>>
>>>>> metrics.reporters: graphite
>>>>> metrics.reporter.graphite.class:
>>>>> org.apache.flink.metrics.graphite.GraphiteReporter
>>>>> metrics.reporter.graphite.host: CARBONRELAYHOST
>>>>> metrics.reporter.graphite.port: 2003
>>>>>
>>>>>
>>>>> Shouldn't also the Aerospike Client be closed ? Or am I missing
>>>>> something, or doing something wrong ?
>>>>>
>>>>>
>>>>> Sorry for the long post.
>>>>>
>>>>> Best Regards,
>>>>>
>>>>> Daniel Santos
>>>>>
>>>>>
>>>>> On 11/29/2016 04:57 PM, Ufuk Celebi wrote:
>>>>>> Hey Daniel!
>>>>>>
>>>>>> Thanks for reporting this. Unbounded growth of non-heap memory is

>>>>>> not expected.
>>> What kind of Threads are you seeing being spawned/lingering around?
>>>>>> As a first step, could you try to disable checkpointing and see 
>>>>>> how it behaves afterwards?
>>>>>>
>>>>>> – Ufuk
>>>>>>
>>>>>> On 29 November 2016 at 17:32:32, Daniel Santos 
>>>>>> (dsantos@cryptolab.net) wrote:
>>>>>>> Hello,
>>>>>>>
>>>>>>> Nope I am using Hadoop HDFS, as state backend, Kafka, as source,

>>>>>>> and a
>>>>>>> HttpClient as a Sink, also Kafka as Sink.
>>>>>>> So it's possible that the state backend is the culprit?
>>>>>>>
>>>>>>> Curious thing is even when no jobs are running streaming or 
>>>>>>> otherwise,
>>>>>>> the JVM Non-HEAP stays the same.
>>>>>>> Which I find it odd.
>>>>>>>
>>>>>>> Another curious thing is that it's proportional to an increase

>>>>>>> of JVM
>>>>>>> thread's number.
>>>>>>> Whenever there are more JVM threads running there is also more
JVM
>>>>>>> Non-HEAP being used, which makes sense.
>>>>>>> But threads stick around never decreasing, too, likewise JVM

>>>>>>> Non-HEAP
>>>>>>> memory.
>>>>>>>
>>>>>>> These observations described are based on what flink's metrics

>>>>>>> are being
>>>>>>> sent and recorded to our graphite's system.
>>>>>>>
>>>>>>> Best Regards,
>>>>>>>
>>>>>>> Daniel Santos
>>>>>>>
>>>>>>> On 11/29/2016 04:04 PM, Cliff Resnick wrote:
>>>>>>>> Are you using the RocksDB backend in native mode? If so then
the
>>>>>>>> off-heap memory may be there.
>>>>>>>>
>>>>>>>> On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote:
>>>>>>>>
>>>>>>>> i have the same problem,but i put the flink job into yarn.
>>>>>>>> but i put the job into yarn on the computer 22,and the job
can
>>>>>>>> success run,and the jobmanager is 79 and taskmanager is 69,they
>>>>>>>> three different compu345ter,
>>>>>>>> however,on computer 22,the pid=3463,which is the job that
put into
>>>>>>>> yarn,is have 2.3g memory,15% of total,
>>>>>>>> the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1
-yjm 1024
>>>>>>>> -ytm 1024 ....
>>>>>>>> why in conputer 22,has occupy so much momory?the job is running
>>>>>>>> computer 79 and computer 69.
>>>>>>>> What would be the possible causes of such behavior ?
>>>>>>>> Best Regards,
>>>>>>>> ----- 原始邮件 -----
>>>>>>>> 发件人:Daniel Santos > > >
>>>>>>>> 收件人:user@flink.apache.org
>>>>>>>> 主题:JVM Non Heap Memory
>>>>>>>> 日期:2016年11月29日 22点26分
>>>>>>>>
>>>>>>>>
>>>>>>>> Hello,
>>>>>>>> Is it common to have high usage of Non-Heap in JVM ?
>>>>>>>> I am running flink in stand-alone cluster and in docker,
with each
>>>>>>>> docker bieng capped at 6G of memory.
>>>>>>>> I have been struggling to keep memory usage in check.
>>>>>>>> The non-heap increases to no end. It start with just 100MB
of
>>>>>>>> usage and
>>>>>>>> after a day it reaches to 1,3GB.
>>>>>>>> Then evetually reaches to 2GB and then eventually the docker
is
>>>>>>>> killed
>>>>>>>> because it has reached the memory limit.
>>>>>>>> My configuration for each flink task manager is the following
:
>>>>>>>> ----------- flink-conf.yaml --------------
>>>>>>>> taskmanager.heap.mb: 3072
>>>>>>>> taskmanager.numberOfTaskSlots: 8
>>>>>>>> taskmanager.memory.preallocate: false
>>>>>>>> taskmanager.network.numberOfBuffers: 12500
>>>>>>>> taskmanager.memory.off-heap: false
>>>>>>>> ---------------------------------------------
>>>>>>>> What would be the possible causes of such behavior ?
>>>>>>>> Best Regards,
>>>>>>>> Daniel Santos
>>>>>>>>
>>>>>>>>
>>>>>>>
>>
>


Mime
View raw message