Hi Julien,

Flink would manage the default 70% fraction of free memory in TaskManager for caching data efficiently, just as you mentioned in this article "https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html". These managed memories are persistent resident and referenced by the MemoryManager once allocated, so they will be resident in old region of JVM and will not be recycled by gc. To do so, wecan aovid the costs of creating and recycling the objects repeatedly.

The default parameter "taskmanager.memory.preallocate" is false, that means these managed memories will not be allocated during starting TaskManager. When the job is running, the related tasks would request these managed memories and then you will see the memory consumption is high. When the job is cancelled, these managed memories will be released to the MemoryManager but not recycled by gc, so you will see no changes in memory consumption. After you restart the TaskManager, the initial memory consumption is low because of lazy allocating via taskmanager.memory.preallocate=false.

发件人:Paul Lam <paullin3280@gmail.com>
发送时间:2018年10月17日(星期三) 12:31
收件人:jpreisner <jpreisner@free.fr>
抄 送:user <user@flink.apache.org>
主 题:Re: Need help to understand memory consumption

Hi Julien,

AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM GC to release the memory. 

Paul Lam

> 在 2018年10月12日,14:29,jpreisner@free.fr 写道:

> Hi,

> My use case is : 
> - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 TaskManager)
> - I run N jobs per days. N may vary (one day : N=20, another day : N=50, ...). All jobs are the same. They connect to Kafka topics and have two DB2 connector.
> - Depending on a special event, a job can self-restart via the command : bin/flink cancel <JobID>
> - At the end of the day, I cancel all jobs
> - Each VM is configured with 16Gb RAM
> - Allocated memory configured for one taskmanager is 10Gb

> After several days, the memory saturates (we exceed 14Gb of used memory).

> I read the following posts but I did not succeed in understanding my problem :
> - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
> - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser

> I did some tests on a machine (outside the cluster) with the top command and this is what I concluded (please see attached file - Flink_memory.PNG) :
> - When a job is started and running, it consumes memory
> - When a job is cancelled, a large part of the memory is still used
> - When another job is started and running (after to have cancel the previous job), even more memory is consumed
> - When I restart jobmanager and taskmanager, memory returns to normal

> Why when a job is canceled, the memory is not released?

> I added another attachment that represents the graph of a job - Graph.PNG.
> If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, triggers and windows, ...

> Thanks in advance,
> Julien<Flink_memory.xlsx><Graph.PNG><Flink_memory.PNG>