flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ashish pok <ashish...@yahoo.com>
Subject Re: Questions on Unbounded number of keys
Date Tue, 31 Jul 2018 12:04:06 GMT
Thanks Till, I will try to create an instance of app will smaller heap and get a couple of
dumps as well. I should be ok to share that on google drive. 

- Ashish

On Tuesday, July 31, 2018, 7:49 AM, Till Rohrmann <trohrmann@apache.org> wrote:

Hi Ashish,
FIRE_AND_PURGE should also clear the window state. Yes I mean with active windows, windows
which have not been purged yet.
Maybe Aljoscha knows more about why the window state is growing (I would not rule out a bug).
On Tue, Jul 31, 2018 at 1:45 PM ashish pok <ashishpok@yahoo.com> wrote:

Hi Till,
Keys are unbounded (a group of events have same key but that key doesnt repeat after it is
fired other than some odd delayed events). So basically there 1 key that will be aligned to
a window. When you say key space of active windows, does that include keys for windows that
have already fired and could be in memory footprint? If so, that is basically the problem
I would get into and looking for a solution to clean-up. Like I said earlier overriding tigger
to FIRE_AND_PURGE did not help. If I take the same stream and key and refactor it to how Chang
is doing it with Process Function, issue goes away.
If you mean only currently processing key space of active windows (not the ones that have
already fired)  then I would say, that cannot be the case. We are getting the data from period
poll of same number of devices and uniqueness of key is simply a time identifier prefixed
to device identifier. Even though there could be a little delayed data, the chances of number
of unique keys growing constantly for days is probably none as device list is constant.
Thanks, Ashish

- Ashish

On Tuesday, July 31, 2018, 4:05 AM, Till Rohrmann <trohrmann@apache.org> wrote:

Hi Ashish,
the processing time session windows need to store state in the StateBackends and I assume
that your key space of active windows is constantly growing. That could explain why you are
seeing an ever increasing memory footprint. But without knowing the input stream and what
the UDFs do this is only a guess.
On Mon, Jul 30, 2018 at 1:43 PM Fabian Hueske <fhueske@gmail.com> wrote:

Hi Chang,
The state handle objects are not created per key but just once per function instance.
Instead they route state accesses to the backend (JVM heap or RocksDB) for the currently active
Best, Fabian

2018-07-30 12:19 GMT+02:00 Chang Liu <fluency.03@gmail.com>:

Hi Andrey,
Thanks for your reply. My question might be silly, but there is still one part I would like
to fully understand. For example, in the following example:
class MyFunction extends KeyedProcessFunction[String, Click, Click] { // keyed by Session
  lazy val userId: ValueState[String] = getRuntimeContext.getState(
    new ValueStateDescriptor[String]("userId", BasicTypeInfo.STRING_TYPE_INFO))

  lazy val clicks: ListState[Click] = getRuntimeContext.getListState(
    new ListStateDescriptor[Click]("clicks", createTypeInformation[Click]))

  override def processElement(
      click: Click,
      context: KeyedProcessFunction[String, Click, Click]#Context,
      out: Collector[Click])
  : Unit = {
    // process, output, clear state if necessary

  override def onTimer(
      timestamp: Long,
      ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext,
      out: Collector[Click])
  : Unit = {
    // output and clear state
Even though I am regularly clearing the two states, userId and clicks (which means I am cleaning
up the values stored in the States), my question is: then what about the two State objects
themselves: userId and clicks?  These States objects are also created per Session ID right?
If the number of Session IDs are unbounded, than the number of these State objects are also
That means, there are userId-state-1 and clicks-state-1 for session-id-1, userId-state-2 and
clicks-state-2 for session-id-2, userId-state-3 and clicks-state-3 for session-id-3, …,
which are handled by different (or same if two from different range, as you call it, are assigned
to the same one) keyed operator instance.
I am not concerning the actual value in the State (which will be managed carefully, if I am
clearing them carefully). I am thinking about the State objects themselves, which I have no
idea what is happening to them and what will happen to them.
Many thanks :)
Best regards/祝好,

Chang Liu 刘畅

On 26 Jul 2018, at 10:55, Andrey Zagrebin <andrey@data-artisans.com> wrote:
Hi Chang Liu,
The unbounded nature of the stream keyed or not should not lead to out of memory. 
Flink parallel keyed operator instances have fixed number (parallelism) and just process some
range of keyed elements, in your example it is a subrange of session ids. 
The keyed processed elements (http requests) are objects created when they enter the pipeline
and garage collected after having been processed in streaming fashion. 
If they arrive very rapidly it can lead to high back pressure from upstream to downstream
operators, buffers can become full and pipeline stops/slows down processing external inputs,
it usually means that your pipeline is under provisioned. 
The only accumulated data comes from state (windows, user state etc), so if you control its
memory consumption, as Till described, there should be no other source of out of memory.

On 25 Jul 2018, at 19:06, Chang Liu <fluency.03@gmail.com> wrote:
Hi Till,
Thanks for your reply. But I think maybe I did not make my question clear. My question is
not about whether the States within each keyed operator instances will run out of memory.
My question is about, whether the unlimited keyed operator instances themselves will run out
of memory.
So to reply to your answers, no matter using different State backends or regularly cleaning
up the States (which is exactly what I am doing), it does not concern the number of keyed
operator instances.
I would like to know:   
   - Will the number of keyed operator instances (Java objects?) grow unbounded? 
   - If so, will they run out of memory? This is not actually related to the memory used by
the keyed Stated inside.
   - If not, then how Flink is managing this multiple keyed operator instances?

I think this needs more knowledge about how Flink works internally to understand how keyed
operator instances are created, maintained and destroyed. That’s why I would like your help
understanding this.
Many Thanks.
Best regards/祝好,

Chang Liu 刘畅

On 24 Jul 2018, at 14:31, Till Rohrmann <trohrmann@apache.org> wrote:
Hi Chang Liu,
if you are dealing with an unlimited number of keys and keep state around for every key, then
your state size will keep growing with the number of keys. If you are using the FileStateBackend
which keeps state in memory, you will eventually run into an OutOfMemoryException. One way
to solve/mitigate this problem is to use the RocksDBStateBackend which can go out of core.
Alternatively, you would need to clean up your state before you run out of memory. One way
to do this is to register for every key a timer which clears the state. But this only works
if you don't amass too much state data before the timer is triggered. If you wish this solution
is some kind of a poor man's state TTL. The Flink community is currently developing a proper
implementation of it which does not rely on additional timers (which increases the state footprint)
[1] https://issues.apache.org/jira/browse/FLINK-9510
On Tue, Jul 24, 2018 at 10:11 AM Chang Liu <fluency.03@gmail.com> wrote:

Dear All,
I have questions regarding the keys. In general, the questions are:   
   - what happens if I am doing keyBy based on unlimited number of keys? How Flink is managing
each KeyedStream under the hood? Will I get memory overflow, for example, if every KeyStream
associated with a specific key is taking certain amount of memory?
   - BTW, I think it is fare to say that, I have to clear my KeyedState so that the memory
used by these State are cleaned up regularly. But still, I am wondering, even though I am
regularly cleaning up State memory, what happened to memory used by the KeyedStream itself,
if there is? And will they be exploding?

Let me give an example for understanding it clearly.  Let’s say we have a
 val requestStream: DataStream[HttpRequest]
which is a stream of HTTP requests. And by using the session ID as the key, we can obtain
a KeyedStream per single session, as following:
        val streamPerSession: KeyedStream[HttpRequest] = requestStream.keyBy(_.sessionId)
However, the session IDs are actually a hashcode generated randomly by the Web service/application,
so that means, the number of sessions are unlimited (which is reasonable, because every time
a user open the application or login, he/she will get a new unique session). 
Then, the question is: will Flink eventually run out of memory because the number of sessions
are unlimited (and because we are keying by the session ID)?   
   - If so, how can we properly manage this situation?
   - If not, could you help me understand WHY?
   - Let’s also assume that, we are regularly clearing the KeyedState, so the memory used
by the State will not explode. 

Many Thanks and Looking forward to your reply :)

Best regards/祝好,

Chang Liu 刘畅

View raw message