flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Niels Basjes <Ni...@basjes.nl>
Subject Re: Writing groups of Windows to files
Date Tue, 04 Jul 2017 11:49:36 GMT
I think I understand.

Since the entrie session must fit into memory anyway I'm going to try to
create a new datastructure which simply contains the 'entire session' and
simply use a Window/BucketingSink construct to ship them to files.
I do need to ensure noone can OOM the system by capping the maximum number
of events in a session to a value that is only reached by robots (for which
the whole idea os sessions is bogus anyway).

Thanks for the tips.

Niels

On Tue, Jul 4, 2017 at 12:07 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> You are right. Building a large record might result in an OOME.
> But I think, that would also happen with a regular SessionWindow,
> RocksDBStatebackend, and a WindowFunction that immediately ships the
> records it receives from the Iterable.
> As far as I know, a SessionWindow stores all elements in an internal a
> ListState. When before iterating over a ListState, the RocksDBStateBackend
> completely deserializes the list into an ArrayList, i.e., all records of a
> session would be on the heap already.
>
> If this is a situation that you have to avoid, I think the only way to do
> this is to implement the sessionization yourself with a ProcessFunction and
> MapState (keyed on timestamp). This would be a non-trivial task though.
>
> In case the SessionWindow + WindowFunction is OK for you (or you
> implemented your own sessionization logic, e.g., in a ProcessFunction), you
> could just forward the elements to a modified BucketingSink.
> Your version of the BucketingSink would need to ensure that files are not
> closed between checkpoints, i.e., only when a checkpoint barrier is
> received, it may be closed. Right now, files are closed on timer and/or
> file size.
> Since all records of a session are emitted by a single WIndowFunction
> call, these records won't be interrupted by a barrier. Hence, you'll have a
> "consistent" state for all windows when a checkpoint is triggered.
>
> I'm afraid, I'm not aware of a simpler solution for this use case.
>
> Hope it helps, Fabian
>
> 2017-07-04 11:24 GMT+02:00 Niels Basjes <Niels@basjes.nl>:
>
>> Hi Fabian,
>>
>> On Fri, Jun 30, 2017 at 6:27 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>>
>>> If I understand your use case correctly, you'd like to hold back all
>>> events of a session until it ends/timesout and then write all events out.
>>> So, instead of aggregating per session (the common use case), you'd just
>>> like to collect the event.
>>>
>>
>> Yes, and I want to write the completed sessions into files. No
>> aggregation or filtering at all.
>> The idea is that our DataScience guys who want to analyze sessions have a
>> much easier task of knowing for certain that they have 'a set of complete
>> sessions'.
>>
>>
>>> I would implement a simple WindowFunction that just forwards all events
>>> that it receives from the iterator. Conceptually, the window will just
>>> collect the events and emit them when the session ended/timedout.
>>> Then you can add BucketingSink which writes out the events. I'm not sure
>>> if the BucketingSInk supports buckets based on event-time though. Maybe you
>>> would need to adapt it a bit to guarantee that all rows of the same session
>>> are written to the same file.
>>> Alternatively, the WindowFunction could also emit one large record which
>>> is a List or Array of events belonging to the same session.
>>>
>>
>> That last one was the idea I had.
>> Have a window function that keeps the Window until finished, then output
>> that with the eventtime of the 'end of the session' and use the bucketing
>> sink to write those to disk.
>>
>> The problem (in my mind) that I have with this is that a single session
>> with a LOT of events would bring the system to a halt because it can
>> trigger OOM events.
>>
>> How should I handle those?
>>
>> Niels
>>
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Mime
View raw message