flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jörn Franke <jornfra...@gmail.com>
Subject Re: Capacity Planning For Large State in YARN Cluster
Date Sun, 29 Oct 2017 22:05:35 GMT
Well you can only performance test it beforehand in different scenarios with different configurations.

I am not sure what exactly your state holds (eg how many objects etc), but if it is Java objects
then 3 times might be a little bit low (depends also how you initially tested state size)
- however Flink optimizes this as well. Nevertheless, something like Rocksdb is probably a
better solution for larger states.

> On 29. Oct 2017, at 21:15, Ashish Pokharel <ashishpok@yahoo.com> wrote:
> Hi Till,
> I got the same feedback from Robert Metzger over in Stackflow. I have switched my app
to use RocksDB and as yes, it did stabilize the app :) 
> However, I am still struggling with how to map out my TMs and JMs memory, number of slots
per TMs etc. Currently I am using 60 slots with 10 TMs and 60 GB of total cluster memory.
Idea was to make the states distributed and approx. 1 GB of memory per slot. I have also changed
containerized.heap-cutoff-ratio config to 0.3 to allow for a little room for RocksDB (RocksDB
is using basic spinning disk optimized pre-defined configs but we do have SSDs on our Prod
machines that we can leverage in future too) and set taskmanager.memory.off-heap to true.It
feels more experimental at this point than an exact science :) If there are any further guidelines
on how we can plan for this as we open up the flood gates to stream heavy continuous streams,
that will be great.
> Thanks again,
> Ashish
>> On Oct 27, 2017, at 8:45 AM, Till Rohrmann <trohrmann@apache.org> wrote:
>> Hi Ashish,
>> what you are describing should be a good use case for Flink and it should be able
to run your program.
>> When you are seeing a GC overhead limit exceeded error, then it means that Flink
or your program are creating too many/too large objects filling up the memory in a short time.
I would recommend checking your user program to see whether you can avoid unnecessary object
instantiations and whether it is possible to reuse created objects.
>> Concerning Flink's state backends, the memory state backend is currently not able
to spill to disk. Also the managed memory is only relevant for DataSet/batch programs and
not streaming programs. Therefore, I would recommend you to try out the RocksDB state backend
which is able to gracefully spill to disk if the state size should grow too large. Consequently,
you don't have to adjust the managed memory settings because they currently don't have an
effect on streaming programs. 
>> My gut feeling is that switching to the RocksDBStateBackend could already solve your
problems. If this should not be the case, then please let me know again.
>> Cheers,
>> Till
>>> On Fri, Oct 27, 2017 at 5:27 AM, Ashish Pokharel <ashishpok@yahoo.com>
>>> Hi Everyone,
>>> We have hit a roadblock moving an app at Production scale and was hoping to get
some guidance. Application is pretty common use case in stream processing but does require
maintaining large number of keyed states. We are processing 2 streams - one of which is a
daily burst of stream (normally around 50 mil but could go upto 100 mil in one hour burst)
and other is constant stream of around 70-80 mil per hour. We are doing a low level join using
CoProcess function between the two keyed streams. CoProcess function needs to refresh (upsert)
state from the daily burst stream and decorate constantly streaming data with values from
state built using bursty stream. All of the logic is working pretty well in a standalone Dev
environment. We are throwing about 500k events of bursty traffic for state and about 2-3 mil
of data stream. We have 1 TM with 16GB memory, 1 JM with 8 GB memory and 16 slots (1 per core
on the server) on the server. We have been taking savepoints in case we need to restart app
for with code changes etc. App does seem to recover from state very well as well. Based on
the savepoints, total volume of state in production flow should be around 25-30GB.
>>> At this point, however, we are trying deploy the app at production scale. App
also has a flag that can be set at startup time to ignore data stream so we can simply initialize
state. So basically we are trying to see if we can initialize the state first and take a savepoint
as test. At this point we are using 10 TM with 4 slots and 8GB memory each (idea was to allocate
around 3 times estimated state size to start with) but TMs keep getting killed by YARN with
a GC Overhead Limit Exceeded error. We have gone through quite a few blogs/docs on Flink Management
Memory, off-heap vs heap memory, Disk Spill over, State Backend etc. We did try to tweak managed-memory
configs in multiple ways (off/on heap, fraction, network buffers etc) but can’t seem to
figure out good way to fine tune the app to avoid issues. Ideally, we would hold state in
memory (we do have enough capacity in Production environment for it) for performance reasons
and spill over to disk (which I believe Flink should provide out of the box?). It feels like
3x anticipated state volume in cluster memory should have been enough to just initialize state.
So instead of just continuing to increase memory (which may or may not help as error is regarding
GC overhead) we wanted to get some input from experts on best practices and approach to plan
this application better.
>>> Appreciate your input in advance!

View raw message