flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: JVM Non Heap Memory
Date Mon, 05 Dec 2016 13:04:44 GMT
Hello Daniel,

I'm afraid you stumbled upon a bug in Flink. Meters were not properly 
cleaned up, causing the underlying dropwizard meter update threads to 
not be shutdown either.

I've opened a JIRA <https://issues.apache.org/jira/browse/FLINK-5261> 
and will open a PR soon.

Thank your for reporting this issue.

Regards,
Chesnay

On 05.12.2016 12:05, Stefan Richter wrote:
> Hi Daniel,
>
> the behaviour you observe looks like some threads are not canceled. 
> Thread cancelation in Flink (and Java in general) is always 
> cooperative, where cooperative means that the thread you want to 
> cancel should somehow check cancelation and react to it. Sometimes 
> this also requires some effort from the client that wants to cancel a 
> thread. So if you implement e.g. custom operators or functions with 
> aerospike, you must ensure that they a) react on cancelation and b) 
> cleanup their resources. If you do not consider this, your aerospike 
> client might stay in a blocking call forever, in particular blocking 
> IO calls are prone to this. What you need to ensure is that 
> cancelation from the clients includes closing IO resources such as 
> streams to unblock the thread and allow for termination. This means 
> that you need your code must (to a certain degree) actively 
> participate in Flink's task lifecycle. In Flink 1.2 we introduce a 
> feature called CloseableRegistry, which makes participating in this 
> lifecycle easier w.r.t. closing resources. For the time being, you 
> should check that Flink’s task cancelation also causes your code to 
> close the aerospike client and check cancelation flags.
>
> Best,
> Stefan
>
>> Am 05.12.2016 um 11:42 schrieb Daniel Santos <dsantos@cryptolab.net 
>> <mailto:dsantos@cryptolab.net>>:
>>
>> Hello,
>>
>> I have done some threads checking and dumps. And I have disabled the 
>> checkpointing.
>>
>> Here are my findings.
>>
>> I did a thread dump a few hours after I booted up the whole cluster. 
>> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )
>>
>> The dump shows that most threads are of 3 sources.
>> *
>> **OutputFlusher --- 634 -- Sleeping State*
>>
>> "OutputFlusher" - Thread t@4758
>>    java.lang.Thread.State: TIMED_WAITING
>>         at java.lang.Thread.sleep(Native Method)
>>         at 
>> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)
>>
>>    Locked ownable synchronizers:
>>         - None
>> *
>> **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics 
>> being used ) -- Parked State*
>>
>> "metrics-meter-tick-thread-1" - Thread t@29024
>>    java.lang.Thread.State: TIMED_WAITING
>>         at sun.misc.Unsafe.park(Native Method)
>>         - parking to wait for <bcfb9f9> (a 
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>         at 
>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>>         at 
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>>         at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
>>         at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>>    Locked ownable synchronizers:
>>         - None
>> *
>> *
>>
>> *tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
>> *
>>
>>  "tend" - Thread t@29011
>>    java.lang.Thread.State: TIMED_WAITING
>>         at java.lang.Thread.sleep(Native Method)
>>         at com.aerospike.client.util.Util.sleep(Util.java:38)
>>         at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>>    Locked ownable synchronizers:
>>         - None
>>
>>
>> I have 2 streaming jobs and a batch Job that runs once in a while.
>>
>> Streaming job A runs with a parallel of 2 and runs Aerospike only in 
>> RichSink .
>>
>> Streaming job B runs with a parallel of 24 and runs Aerospike in 
>> RichFilterFunction / RichMapFunction with open and close methods, in 
>> order to open and close the client.
>>
>> Batch Job runs Aerospike Client in RichFilterFunction / 
>> RichMapFunction with open and close methods in order to open and 
>> close the client.
>>
>> Next thing I cancelled all the streaming jobs @5/12/2016 and checked 
>> the threads and the JVM non-heap usage.
>>
>> JVM non-heap usage reaches 3GB, threads go down, but some still 
>> linger around and they are the following.
>>
>> *Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being 
>> used ) *
>>
>> "metrics-meter-tick-thread-1" - Thread t@29024
>>    java.lang.Thread.State: TIMED_WAITING
>>         at sun.misc.Unsafe.park(Native Method)
>>         - parking to wait for <bcfb9f9> (a 
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>         at 
>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>>         at 
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>>         at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
>>         at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>>    Locked ownable synchronizers:
>>         - None
>>
>> *
>> *
>>
>> *tend -- 432**( Aerospike Client Thread )*
>>
>>
>>  "tend" - Thread t@29011
>>    java.lang.Thread.State: TIMED_WAITING
>>         at java.lang.Thread.sleep(Native Method)
>>         at com.aerospike.client.util.Util.sleep(Util.java:38)
>>         at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>>    Locked ownable synchronizers:
>>         - None
>>
>>
>> Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) . 
>> So I have 1220 threads that I believe that sould be dead and not 
>> running, since I have no jobs running at all.
>>
>> And the JVM Non-HEAP usage doesn't decreases at all, after removing 
>> every job.
>>
>>
>> Why the hell metrics grow to no end ?
>>
>> I am using the following libs for metrics :
>>
>> - metrics-graphite-3.1.0.jar
>>
>> - metrics-core-3.1.0.jar
>>
>> - flink-metrics-dropwizard-1.1.3.jar
>>
>> - flink-metrics-graphite-1.1.3.jar
>>
>> And the config for reporter is :
>>
>> metrics.reporters: graphite
>> metrics.reporter.graphite.class: 
>> org.apache.flink.metrics.graphite.GraphiteReporter
>> metrics.reporter.graphite.host: CARBONRELAYHOST
>> metrics.reporter.graphite.port: 2003
>>
>>
>> Shouldn't also the Aerospike Client be closed ? Or am I missing 
>> something, or doing something wrong ?
>>
>>
>> Sorry for the long post.
>>
>> Best Regards,
>>
>> Daniel Santos
>>
>>
>> On 11/29/2016 04:57 PM, Ufuk Celebi wrote:
>>> Hey Daniel!
>>>
>>> Thanks for reporting this. Unbounded growth of non-heap memory is not expected.
 What kind of Threads are you seeing being spawned/lingering around?
>>>
>>> As a first step, could you try to disable checkpointing and see how it behaves
afterwards?
>>>
>>> – Ufuk
>>>
>>> On 29 November 2016 at 17:32:32, Daniel Santos (dsantos@cryptolab.net) wrote:
>>>> Hello,
>>>>   
>>>> Nope I am using Hadoop HDFS, as state backend, Kafka, as source, and a
>>>> HttpClient as a Sink, also Kafka as Sink.
>>>> So it's possible that the state backend is the culprit?
>>>>   
>>>> Curious thing is even when no jobs are running streaming or otherwise,
>>>> the JVM Non-HEAP stays the same.
>>>> Which I find it odd.
>>>>   
>>>> Another curious thing is that it's proportional to an increase of JVM
>>>> thread's number.
>>>> Whenever there are more JVM threads running there is also more JVM
>>>> Non-HEAP being used, which makes sense.
>>>> But threads stick around never decreasing, too, likewise JVM Non-HEAP
>>>> memory.
>>>>   
>>>> These observations described are based on what flink's metrics are being
>>>> sent and recorded to our graphite's system.
>>>>   
>>>> Best Regards,
>>>>   
>>>> Daniel Santos
>>>>   
>>>> On 11/29/2016 04:04 PM, Cliff Resnick wrote:
>>>>> Are you using the RocksDB backend in native mode? If so then the
>>>>> off-heap memory may be there.
>>>>>
>>>>> On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote:
>>>>>
>>>>> i have the same problem,but i put the flink job into yarn.
>>>>> but i put the job into yarn on the computer 22,and the job can
>>>>> success run,and the jobmanager is 79 and taskmanager is 69,they
>>>>> three different compu345ter,
>>>>> however,on computer 22,the pid=3463,which is the job that put into
>>>>> yarn,is have 2.3g memory,15% of total,
>>>>> the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024
>>>>> -ytm 1024 ....
>>>>> why in conputer 22,has occupy so much momory?the job is running
>>>>> computer 79 and computer 69.
>>>>> What would be the possible causes of such behavior ?
>>>>> Best Regards,
>>>>> ----- 原始邮件 -----
>>>>> 发件人:Daniel Santos > > >
>>>>> 收件人:user@flink.apache.org   
>>>>> 主题:JVM Non Heap Memory
>>>>> 日期:2016年11月29日 22点26分
>>>>>
>>>>>
>>>>> Hello,
>>>>> Is it common to have high usage of Non-Heap in JVM ?
>>>>> I am running flink in stand-alone cluster and in docker, with each
>>>>> docker bieng capped at 6G of memory.
>>>>> I have been struggling to keep memory usage in check.
>>>>> The non-heap increases to no end. It start with just 100MB of
>>>>> usage and
>>>>> after a day it reaches to 1,3GB.
>>>>> Then evetually reaches to 2GB and then eventually the docker is
>>>>> killed
>>>>> because it has reached the memory limit.
>>>>> My configuration for each flink task manager is the following :
>>>>> ----------- flink-conf.yaml --------------
>>>>> taskmanager.heap.mb: 3072
>>>>> taskmanager.numberOfTaskSlots: 8
>>>>> taskmanager.memory.preallocate: false
>>>>> taskmanager.network.numberOfBuffers: 12500
>>>>> taskmanager.memory.off-heap: false
>>>>> ---------------------------------------------
>>>>> What would be the possible causes of such behavior ?
>>>>> Best Regards,
>>>>> Daniel Santos
>>>>>
>>>>>
>>>>   
>>>>   
>>
>


Mime
View raw message