flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Cleanup of OperatorStates?
Date Tue, 01 Dec 2015 12:00:39 GMT
Just for clarification: The real-time results should also contain the
visitId, correct?

On Tue, Dec 1, 2015 at 12:06 PM, Stephan Ewen <sewen@apache.org> wrote:

> Hi Niels!
>
> If you want to use the built-in windowing, you probably need two window:
>   - One for ID assignment (that immediately pipes elements through)
>   - One for accumulating session elements, and then piping them into files
> upon session end.
>
> You may be able to use the rolling file sink (roll by 15 minutes) to store
> the files.
> That is probably the simplest to implement and will serve the real time
> case.
>
>
>                                     +--> (real time sink)
>                                     |
> (source) --> (window session ids) --+
>                                     |
>                                     +--> (window session) --> (rolling
> sink)
>
>
> You can put this all into one operator that accumulates the session
> elements but still immediately emits the new records (the realtime path),
> if you implement your own windowing/buffering in a custom function.
> This is also very easy to put onto event time then, which makes it
> valueable to process the history (replay). For this second case, still
> prototyping some code for the event time case, give me a bit, I'll get back
> at you...
>
> Greetings,
> Stephan
>
>
> On Tue, Dec 1, 2015 at 10:55 AM, Niels Basjes <Niels@basjes.nl> wrote:
>
>> Hi Stephan,
>>
>> I created a first version of the Visit ID assignment like this:
>>
>> First I group by sessionid and I create a Window per visit.
>> The custom Trigger for this window does a 'FIRE' after each element and
>> sets an EventTimer on the 'next possible moment the visit can expire'.
>> To avoid getting 'all events' in the visit after every 'FIRE' I'm using
>> CountEvictor.of(1).
>> When the visit expires I do a PURGE. So if there are more events
>> afterwards for the same sessionId I get a new visit (which is exactly what
>> I want).
>>
>> The last step I do is I want to have a 'normal' DataStream again to work
>> with.
>> I created this WindowFunction to map the Window stream back to  normal
>> DataStream
>> Essentially I do this:
>>
>> DataStream<Foo> visitDataStream = visitWindowedStream.apply(new
>> WindowToStream<Foo>())
>>
>> // This is an identity 'apply'
>> private static class WindowToStream<T> implements WindowFunction<T, T,
>> String, GlobalWindow> {
>>     @Override
>>     public void apply(String s, GlobalWindow window, Iterable<T> values,
>> Collector<T> out) throws Exception {
>>         for (T value: values) {
>>             out.collect(value);
>>         }
>>     }
>> }
>>
>>
>> The problem with this is that I first create the visitIds in a Window
>> (great).
>> Because I really need to have both the Windowed events AND the near
>> realtime version I currently break down the Window to get the single events
>> and after that I have to recreate the same Window again.
>>
>> I'm looking forward to the implementation direction you are referring to.
>> I hope you have a better way of doing this.
>>
>> Niels Basjes
>>
>>
>> On Mon, Nov 30, 2015 at 9:29 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>>> Hi Niels!
>>>
>>> Nice use case that you have!
>>> I think you can solve this super nicely with Flink, such that "replay"
>>> and "realtime" are literally the same program - they differ only in whether
>>>
>>> Event time is, like you said, the key thing for "replay". Event time
>>> depends on the progress in the timestamps of the data, so it can progress
>>> at different speeds, depending on what the rate of your stream is.
>>> With the appropriate data source, it will progress very fast in "replay
>>> mode", so that you replay in "fast forward speed", and it progresses at the
>>> same speed as processing time when you attach to the end of the Kafka queue.
>>>
>>> When you define the time intervals in your program to react to event
>>> time progress, then you will compute the right sessionization in both
>>> replay and real time settings.
>>>
>>> I am writing a little example code to share. The type of ID-assignment
>>> sessions you want to do need an undocumented API right now, so I'll prepare
>>> something there for you...
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>>
>>> On Sun, Nov 29, 2015 at 4:04 PM, Niels Basjes <Niels@basjes.nl> wrote:
>>>
>>>> Hi,
>>>>
>>>> The sessionid is present in the measurements. It can also be seen as a
>>>> form of 'browser id'.
>>>> Most websites use either a 'long lived random value in a cookie' or a
>>>> 'application session id' for this.
>>>>
>>>> So with the id of the browser in hand I have the need to group all
>>>> events into "periods of activity" which I call a visit.
>>>> Such a visit is a bounded subset of all events from a single browser.
>>>>
>>>> What I need is to add a (sort of) random visit id to the events that
>>>> becomes 'inactive' after more than X minutes of inactivity.
>>>> I then want to add this visitid to each event and
>>>> 1) stream them out in realtime
>>>> 2) Wait till the visit ends and store the complete visit on disk (I am
>>>> going for either AVRO or Parquet).
>>>>
>>>> I want to create diskfiles with all visits that ended in a specific
>>>> time period. So essentially
>>>>         "Group by round(<timestamp of last event>, 15 minutes)"
>>>>
>>>>
>>>> Because of the need to be able to 'repair' things I came with the
>>>> following question:
>>>> In the Flink API I see the 'process time' (i.e. the actual time of the
>>>> server) and the 'event time' (i.e. the time when and event was recorded).
>>>>
>>>> Now in my case all events are in Kafka (for say 2 weeks).
>>>> When something goes wrong I want to be able to 'reprocess' everything
>>>> from the start of the queue.
>>>> Here the matter of 'event time' becomes a big question for me; In those
>>>> 'replay' situations the event time will progress at a much higher speed
>>>> than the normal 1sec/sec.
>>>>
>>>> How does this work in Apache Flink?
>>>>
>>>>
>>>> Niels Basjes
>>>>
>>>>
>>>> On Fri, Nov 27, 2015 at 3:28 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>>
>>>>> Hey Niels!
>>>>>
>>>>> You may be able to implement this in windows anyways, depending on
>>>>> your setup. You can definitely implement state with timeout yourself
(using
>>>>> the more low-level state interface), or you may be able to use custom
>>>>> windows for that (they can trigger on every element and return elements
>>>>> immediately, thereby giving you low latency).
>>>>>
>>>>> Can you tell me where exactly the session ID comes from? Is that
>>>>> something that the function with state generates itself?
>>>>> Depending on that answer, I can outline either the window, or the
>>>>> custom state way...
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Nov 27, 2015 at 2:19 PM, Niels Basjes <Niels@basjes.nl>
wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thanks for the explanation.
>>>>>> I have clickstream data arriving in realtime and I need to assign
the
>>>>>> visitId and stream it out again (with the visitId now begin part
of the
>>>>>> record) into Kafka with the lowest possible latency.
>>>>>> Although the Window feature allows me to group and close the visit
on
>>>>>> a timeout/expire (as shown to me by Aljoscha in a separate email)
it does
>>>>>> make a 'window'.
>>>>>>
>>>>>> So (as requested) I created a ticket for such a feature:
>>>>>> https://issues.apache.org/jira/browse/FLINK-3089
>>>>>>
>>>>>> Niels
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Nov 27, 2015 at 11:51 AM, Stephan Ewen <sewen@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Niels!
>>>>>>>
>>>>>>> Currently, state is released by setting the value for the key
to
>>>>>>> null. If you are tracking web sessions, you can try and send
a "end of
>>>>>>> session" element that sets the value to null.
>>>>>>>
>>>>>>> To be on the safe side, you probably want state that is
>>>>>>> automatically purged after a while. I would look into using Windows
for
>>>>>>> that. The triggers there are flexible so you can schedule both
actions on
>>>>>>> elements plus cleanup after a certain time delay (clock time
or event time).
>>>>>>>
>>>>>>> The question about "state expiry" has come a few times. People
seem
>>>>>>> to like working on state directly, but it should clean up automatically.
>>>>>>>
>>>>>>> Can you see if your use case fits onto windows, otherwise open
a
>>>>>>> ticket for state expiry?
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Nov 26, 2015 at 10:42 PM, Niels Basjes <Niels@basjes.nl>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm working on a streaming application that ingests clickstream
>>>>>>>> data.
>>>>>>>> In a specific part of the flow I need to retain a little
bit of
>>>>>>>> state per visitor (i.e. keyBy(sessionid) )
>>>>>>>>
>>>>>>>> So I'm using the Key/Value state interface (i.e. OperatorState<
>>>>>>>> MyRecord>) in a map function.
>>>>>>>>
>>>>>>>> Now in my application I expect to get a huge number of sessions
per
>>>>>>>> day.
>>>>>>>> Since these sessionids are 'random' and become unused after
the
>>>>>>>> visitor leaves the website over time the system will have
seen millions of
>>>>>>>> those sessionids.
>>>>>>>>
>>>>>>>> So I was wondering: how are these OperatorStates cleaned?
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards / Met vriendelijke groeten,
>>>>>>>>
>>>>>>>> Niels Basjes
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards / Met vriendelijke groeten,
>>>>>>
>>>>>> Niels Basjes
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>
>>>
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>

Mime
View raw message