flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Cleanup of OperatorStates?
Date Mon, 30 Nov 2015 20:29:34 GMT
Hi Niels!

Nice use case that you have!
I think you can solve this super nicely with Flink, such that "replay" and
"realtime" are literally the same program - they differ only in whether

Event time is, like you said, the key thing for "replay". Event time
depends on the progress in the timestamps of the data, so it can progress
at different speeds, depending on what the rate of your stream is.
With the appropriate data source, it will progress very fast in "replay
mode", so that you replay in "fast forward speed", and it progresses at the
same speed as processing time when you attach to the end of the Kafka queue.

When you define the time intervals in your program to react to event time
progress, then you will compute the right sessionization in both replay and
real time settings.

I am writing a little example code to share. The type of ID-assignment
sessions you want to do need an undocumented API right now, so I'll prepare
something there for you...

Greetings,
Stephan



On Sun, Nov 29, 2015 at 4:04 PM, Niels Basjes <Niels@basjes.nl> wrote:

> Hi,
>
> The sessionid is present in the measurements. It can also be seen as a
> form of 'browser id'.
> Most websites use either a 'long lived random value in a cookie' or a
> 'application session id' for this.
>
> So with the id of the browser in hand I have the need to group all events
> into "periods of activity" which I call a visit.
> Such a visit is a bounded subset of all events from a single browser.
>
> What I need is to add a (sort of) random visit id to the events that
> becomes 'inactive' after more than X minutes of inactivity.
> I then want to add this visitid to each event and
> 1) stream them out in realtime
> 2) Wait till the visit ends and store the complete visit on disk (I am
> going for either AVRO or Parquet).
>
> I want to create diskfiles with all visits that ended in a specific time
> period. So essentially
>         "Group by round(<timestamp of last event>, 15 minutes)"
>
>
> Because of the need to be able to 'repair' things I came with the
> following question:
> In the Flink API I see the 'process time' (i.e. the actual time of the
> server) and the 'event time' (i.e. the time when and event was recorded).
>
> Now in my case all events are in Kafka (for say 2 weeks).
> When something goes wrong I want to be able to 'reprocess' everything from
> the start of the queue.
> Here the matter of 'event time' becomes a big question for me; In those
> 'replay' situations the event time will progress at a much higher speed
> than the normal 1sec/sec.
>
> How does this work in Apache Flink?
>
>
> Niels Basjes
>
>
> On Fri, Nov 27, 2015 at 3:28 PM, Stephan Ewen <sewen@apache.org> wrote:
>
>> Hey Niels!
>>
>> You may be able to implement this in windows anyways, depending on your
>> setup. You can definitely implement state with timeout yourself (using the
>> more low-level state interface), or you may be able to use custom windows
>> for that (they can trigger on every element and return elements
>> immediately, thereby giving you low latency).
>>
>> Can you tell me where exactly the session ID comes from? Is that
>> something that the function with state generates itself?
>> Depending on that answer, I can outline either the window, or the custom
>> state way...
>>
>> Greetings,
>> Stephan
>>
>>
>>
>>
>>
>> On Fri, Nov 27, 2015 at 2:19 PM, Niels Basjes <Niels@basjes.nl> wrote:
>>
>>> Hi,
>>>
>>> Thanks for the explanation.
>>> I have clickstream data arriving in realtime and I need to assign the
>>> visitId and stream it out again (with the visitId now begin part of the
>>> record) into Kafka with the lowest possible latency.
>>> Although the Window feature allows me to group and close the visit on a
>>> timeout/expire (as shown to me by Aljoscha in a separate email) it does
>>> make a 'window'.
>>>
>>> So (as requested) I created a ticket for such a feature:
>>> https://issues.apache.org/jira/browse/FLINK-3089
>>>
>>> Niels
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Nov 27, 2015 at 11:51 AM, Stephan Ewen <sewen@apache.org> wrote:
>>>
>>>> Hi Niels!
>>>>
>>>> Currently, state is released by setting the value for the key to null.
>>>> If you are tracking web sessions, you can try and send a "end of session"
>>>> element that sets the value to null.
>>>>
>>>> To be on the safe side, you probably want state that is automatically
>>>> purged after a while. I would look into using Windows for that. The
>>>> triggers there are flexible so you can schedule both actions on elements
>>>> plus cleanup after a certain time delay (clock time or event time).
>>>>
>>>> The question about "state expiry" has come a few times. People seem to
>>>> like working on state directly, but it should clean up automatically.
>>>>
>>>> Can you see if your use case fits onto windows, otherwise open a ticket
>>>> for state expiry?
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Thu, Nov 26, 2015 at 10:42 PM, Niels Basjes <Niels@basjes.nl> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm working on a streaming application that ingests clickstream data.
>>>>> In a specific part of the flow I need to retain a little bit of state
>>>>> per visitor (i.e. keyBy(sessionid) )
>>>>>
>>>>> So I'm using the Key/Value state interface (i.e. OperatorState<
>>>>> MyRecord>) in a map function.
>>>>>
>>>>> Now in my application I expect to get a huge number of sessions per
>>>>> day.
>>>>> Since these sessionids are 'random' and become unused after the
>>>>> visitor leaves the website over time the system will have seen millions
of
>>>>> those sessionids.
>>>>>
>>>>> So I was wondering: how are these OperatorStates cleaned?
>>>>>
>>>>>
>>>>> --
>>>>> Best regards / Met vriendelijke groeten,
>>>>>
>>>>> Niels Basjes
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Mime
View raw message