flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Bortoli <stefano.bort...@huawei.com>
Subject RE: Flink memory usage
Date Thu, 20 Apr 2017 14:33:23 GMT
Hi Billy,

The only suggestion I can give is to check very well in your code for useless variable allocations,
and foster reuse as much as possible. Don’t create a new collection at any map execution,
but rather clear, reuse the collected output of the flatMap, and so on.  In the past we run
long process of lot of data and small memory without problems. Many more complex co-group,
joins and so on without any issue.

My2c. Hope it helps.


From: Newport, Billy [mailto:Billy.Newport@gs.com]
Sent: Thursday, April 20, 2017 1:31 PM
To: 'Fabian Hueske' <fhueske@gmail.com>
Cc: 'user@flink.apache.org' <user@flink.apache.org>
Subject: RE: Flink memory usage

I don’t think our function are memory heavy they typically are cogroups and merge the records
on the left with the records on the right.

We’re currently requiring 720GB of heap to do our processing which frankly appears ridiculous
to us. Could too much parallelism be causing the problem? Looking at:


If we are processing 17 “datasets” in a single job and each has an individual parallelism
of 40 is that a total parallelism (potential) of 17*40 and given your network buffers calculation
of parallelism squared, would that do it or only if we explicitly configure it that way:

taskmanager.network.numberOfBuffers: p ^ 2 * t * 4

where p is the maximum parallelism of the job and t is the number of task manager.
You can process more than one parallel task per TM if you configure more than one processing
slot per machine ( taskmanager.numberOfTaskSlots). The TM will divide its memory among all
its slots. So it would be possible to start one TM for each machine with 100GB+ memory and
48 slots each.

Our pipeline for each dataset looks like this:

Read avro file -> FlatMap -> Validate each record with a flatmap ->
Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated avro file
above -> }
Read Parquet -> FlatMap -> Filter Dead Rows  ------------------------------------>
 } Union cogroup with dead rows and write result to parquet file.

I don’t understand why this logic couldn’t run with a single task manager and just take
longer. We’re having a lot of trouble trying to change the tuning to reduce the memory burn.
We run the above pipeline with parallelism 40 for all 17 datasets in a single job.

We’re running this config now which is not really justifiable for what we’re doing.

20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap…


From: Fabian Hueske [mailto:fhueske@gmail.com]
Sent: Wednesday, April 19, 2017 10:52 AM
To: Newport, Billy [Tech]
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Flink memory usage

Hi Billy,
Flink's internal operators are implemented to not allocate heap space proportional to the
size of the input data.
Whenever Flink needs to hold data in memory (e.g., for sorting or building a hash table) the
data is serialized into managed memory. If all memory is in use, Flink starts spilling to
disk. This blog post discusses how Flink uses its managed memory [1] (still up to date, even
though it's almost 2 years old).
The runtime code should actually quite stable. Most of the code has been there for several
years (even before Flink was donated to the ASF) and we haven't seen many bugs reported for
the DataSet runtime. Of course this does not mean that the code doesn't contain bugs.

However, Flink does not take care of the user code. For example a GroupReduceFunction that
collects a lot of data, e.g., in a List on the heap, can still kill a program.
I would check if you have user functions that require lots of heap memory.
Also reducing the size of the managed memory to have more heap space available might help.
If that doesn't solve the problem, it would be good if you could share some details about
your job (which operators, which local strategies, how many operators) that might help to
identify the misbehaving operator.

Thanks, Fabian

[1] https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__flink.apache.org_news_2015_05_11_Juggling-2Dwith-2DBits-2Dand-2DBytes.html&d=DgMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=gXNF2FIfEb4pGn-GTNBdJ8q7RfWbahzA3eykq55STe0&s=PSRQ06vPRHlB-80MrNWAIGluVV4I-c7kZ35Dm-OIRzs&e=>

2017-04-19 16:09 GMT+02:00 Newport, Billy <Billy.Newport@gs.com<mailto:Billy.Newport@gs.com>>:
How does Flink use memory? We’re seeing cases when running a job on larger datasets where
it throws OOM exceptions during the job. We’re using the Dataset API. Shouldn’t flink
be streaming from disk to disk? We workaround by using fewer slots but it seems unintuitive
that I need to change these settings given Flink != Spark. Why isn’t Flinks memory usage
constant? Why couldn’t I run a job with a single task and a single slot for any size job
successfully other than it takes much longer to run.


View raw message