flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Niels Basjes <Ni...@basjes.nl>
Subject Re: Cleanup of OperatorStates?
Date Tue, 01 Dec 2015 15:41:18 GMT
Thanks!
I'm going to study this code closely!

Niels

On Tue, Dec 1, 2015 at 2:50 PM, Stephan Ewen <sewen@apache.org> wrote:

> Hi Niels!
>
> I have a pretty nice example for you here:
> https://github.com/StephanEwen/sessionization
>
> It keeps only one state and has the structure:
>
>
> (source) --> (window sessions) ---> (real time sink)
>                       |
>                       +--> (15 minute files)
>
>
> The real time sink gets the event with attached visitId immediately. The
> session operator, as a side effect, writes out the 15 minute files with
> sessions that expired in that time.
>
>
> It is not a lot of code, the two main parts are
>
>   - the program and the program skeleton:
> https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/EventTimeSessionization.java
>   - the sessionizing and file writing operator:
> https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/SessionizingOperator.java
>
>
> The example runs fully on event time, where the timestamps are extracted
> from the records. That makes this program very robust (no issue with
> clocks, etc).
>
> Also, here comes the amazing part: The same program should do "replay" and
> real time. The only difference is what input you give it. Since time is
> event time, it can do both.
>
>
> One note:
>   - Event Time Watermarks are the mechanism to signal progress in event
> time. It is simple here, because I assume that timestamps are ascending in
> a Kafka partition. If that is not the case, you need to implement a more
> elaborate TimestampExtractor.
>
>
> Hope you can work with this!
>
> Greetings,
> Stephan
>
>
> On Tue, Dec 1, 2015 at 1:00 PM, Stephan Ewen <sewen@apache.org> wrote:
>
>> Just for clarification: The real-time results should also contain the
>> visitId, correct?
>>
>> On Tue, Dec 1, 2015 at 12:06 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>>> Hi Niels!
>>>
>>> If you want to use the built-in windowing, you probably need two window:
>>>   - One for ID assignment (that immediately pipes elements through)
>>>   - One for accumulating session elements, and then piping them into
>>> files upon session end.
>>>
>>> You may be able to use the rolling file sink (roll by 15 minutes) to
>>> store the files.
>>> That is probably the simplest to implement and will serve the real time
>>> case.
>>>
>>>
>>>                                     +--> (real time sink)
>>>                                     |
>>> (source) --> (window session ids) --+
>>>                                     |
>>>                                     +--> (window session) --> (rolling
>>> sink)
>>>
>>>
>>> You can put this all into one operator that accumulates the session
>>> elements but still immediately emits the new records (the realtime path),
>>> if you implement your own windowing/buffering in a custom function.
>>> This is also very easy to put onto event time then, which makes it
>>> valueable to process the history (replay). For this second case, still
>>> prototyping some code for the event time case, give me a bit, I'll get back
>>> at you...
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Tue, Dec 1, 2015 at 10:55 AM, Niels Basjes <Niels@basjes.nl> wrote:
>>>
>>>> Hi Stephan,
>>>>
>>>> I created a first version of the Visit ID assignment like this:
>>>>
>>>> First I group by sessionid and I create a Window per visit.
>>>> The custom Trigger for this window does a 'FIRE' after each element and
>>>> sets an EventTimer on the 'next possible moment the visit can expire'.
>>>> To avoid getting 'all events' in the visit after every 'FIRE' I'm using
>>>> CountEvictor.of(1).
>>>> When the visit expires I do a PURGE. So if there are more events
>>>> afterwards for the same sessionId I get a new visit (which is exactly what
>>>> I want).
>>>>
>>>> The last step I do is I want to have a 'normal' DataStream again to
>>>> work with.
>>>> I created this WindowFunction to map the Window stream back to  normal
>>>> DataStream
>>>> Essentially I do this:
>>>>
>>>> DataStream<Foo> visitDataStream = visitWindowedStream.apply(new
>>>> WindowToStream<Foo>())
>>>>
>>>> // This is an identity 'apply'
>>>> private static class WindowToStream<T> implements WindowFunction<T,
T,
>>>> String, GlobalWindow> {
>>>>     @Override
>>>>     public void apply(String s, GlobalWindow window, Iterable<T>
>>>> values, Collector<T> out) throws Exception {
>>>>         for (T value: values) {
>>>>             out.collect(value);
>>>>         }
>>>>     }
>>>> }
>>>>
>>>>
>>>> The problem with this is that I first create the visitIds in a Window
>>>> (great).
>>>> Because I really need to have both the Windowed events AND the near
>>>> realtime version I currently break down the Window to get the single events
>>>> and after that I have to recreate the same Window again.
>>>>
>>>> I'm looking forward to the implementation direction you are referring
>>>> to. I hope you have a better way of doing this.
>>>>
>>>> Niels Basjes
>>>>
>>>>
>>>> On Mon, Nov 30, 2015 at 9:29 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>>
>>>>> Hi Niels!
>>>>>
>>>>> Nice use case that you have!
>>>>> I think you can solve this super nicely with Flink, such that "replay"
>>>>> and "realtime" are literally the same program - they differ only in whether
>>>>>
>>>>> Event time is, like you said, the key thing for "replay". Event time
>>>>> depends on the progress in the timestamps of the data, so it can progress
>>>>> at different speeds, depending on what the rate of your stream is.
>>>>> With the appropriate data source, it will progress very fast in
>>>>> "replay mode", so that you replay in "fast forward speed", and it
>>>>> progresses at the same speed as processing time when you attach to the
end
>>>>> of the Kafka queue.
>>>>>
>>>>> When you define the time intervals in your program to react to event
>>>>> time progress, then you will compute the right sessionization in both
>>>>> replay and real time settings.
>>>>>
>>>>> I am writing a little example code to share. The type of ID-assignment
>>>>> sessions you want to do need an undocumented API right now, so I'll prepare
>>>>> something there for you...
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Nov 29, 2015 at 4:04 PM, Niels Basjes <Niels@basjes.nl>
wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> The sessionid is present in the measurements. It can also be seen
as
>>>>>> a form of 'browser id'.
>>>>>> Most websites use either a 'long lived random value in a cookie'
or a
>>>>>> 'application session id' for this.
>>>>>>
>>>>>> So with the id of the browser in hand I have the need to group all
>>>>>> events into "periods of activity" which I call a visit.
>>>>>> Such a visit is a bounded subset of all events from a single browser.
>>>>>>
>>>>>> What I need is to add a (sort of) random visit id to the events that
>>>>>> becomes 'inactive' after more than X minutes of inactivity.
>>>>>> I then want to add this visitid to each event and
>>>>>> 1) stream them out in realtime
>>>>>> 2) Wait till the visit ends and store the complete visit on disk
(I
>>>>>> am going for either AVRO or Parquet).
>>>>>>
>>>>>> I want to create diskfiles with all visits that ended in a specific
>>>>>> time period. So essentially
>>>>>>         "Group by round(<timestamp of last event>, 15 minutes)"
>>>>>>
>>>>>>
>>>>>> Because of the need to be able to 'repair' things I came with the
>>>>>> following question:
>>>>>> In the Flink API I see the 'process time' (i.e. the actual time of
>>>>>> the server) and the 'event time' (i.e. the time when and event was
>>>>>> recorded).
>>>>>>
>>>>>> Now in my case all events are in Kafka (for say 2 weeks).
>>>>>> When something goes wrong I want to be able to 'reprocess' everything
>>>>>> from the start of the queue.
>>>>>> Here the matter of 'event time' becomes a big question for me; In
>>>>>> those 'replay' situations the event time will progress at a much
higher
>>>>>> speed than the normal 1sec/sec.
>>>>>>
>>>>>> How does this work in Apache Flink?
>>>>>>
>>>>>>
>>>>>> Niels Basjes
>>>>>>
>>>>>>
>>>>>> On Fri, Nov 27, 2015 at 3:28 PM, Stephan Ewen <sewen@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Niels!
>>>>>>>
>>>>>>> You may be able to implement this in windows anyways, depending
on
>>>>>>> your setup. You can definitely implement state with timeout yourself
(using
>>>>>>> the more low-level state interface), or you may be able to use
custom
>>>>>>> windows for that (they can trigger on every element and return
elements
>>>>>>> immediately, thereby giving you low latency).
>>>>>>>
>>>>>>> Can you tell me where exactly the session ID comes from? Is that
>>>>>>> something that the function with state generates itself?
>>>>>>> Depending on that answer, I can outline either the window, or
the
>>>>>>> custom state way...
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Nov 27, 2015 at 2:19 PM, Niels Basjes <Niels@basjes.nl>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Thanks for the explanation.
>>>>>>>> I have clickstream data arriving in realtime and I need to
assign
>>>>>>>> the visitId and stream it out again (with the visitId now
begin part of the
>>>>>>>> record) into Kafka with the lowest possible latency.
>>>>>>>> Although the Window feature allows me to group and close
the visit
>>>>>>>> on a timeout/expire (as shown to me by Aljoscha in a separate
email) it
>>>>>>>> does make a 'window'.
>>>>>>>>
>>>>>>>> So (as requested) I created a ticket for such a feature:
>>>>>>>> https://issues.apache.org/jira/browse/FLINK-3089
>>>>>>>>
>>>>>>>> Niels
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Nov 27, 2015 at 11:51 AM, Stephan Ewen <sewen@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Niels!
>>>>>>>>>
>>>>>>>>> Currently, state is released by setting the value for
the key to
>>>>>>>>> null. If you are tracking web sessions, you can try and
send a "end of
>>>>>>>>> session" element that sets the value to null.
>>>>>>>>>
>>>>>>>>> To be on the safe side, you probably want state that
is
>>>>>>>>> automatically purged after a while. I would look into
using Windows for
>>>>>>>>> that. The triggers there are flexible so you can schedule
both actions on
>>>>>>>>> elements plus cleanup after a certain time delay (clock
time or event time).
>>>>>>>>>
>>>>>>>>> The question about "state expiry" has come a few times.
People
>>>>>>>>> seem to like working on state directly, but it should
clean up
>>>>>>>>> automatically.
>>>>>>>>>
>>>>>>>>> Can you see if your use case fits onto windows, otherwise
open a
>>>>>>>>> ticket for state expiry?
>>>>>>>>>
>>>>>>>>> Greetings,
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Nov 26, 2015 at 10:42 PM, Niels Basjes <Niels@basjes.nl>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I'm working on a streaming application that ingests
clickstream
>>>>>>>>>> data.
>>>>>>>>>> In a specific part of the flow I need to retain a
little bit of
>>>>>>>>>> state per visitor (i.e. keyBy(sessionid) )
>>>>>>>>>>
>>>>>>>>>> So I'm using the Key/Value state interface (i.e.
OperatorState<
>>>>>>>>>> MyRecord>) in a map function.
>>>>>>>>>>
>>>>>>>>>> Now in my application I expect to get a huge number
of sessions
>>>>>>>>>> per day.
>>>>>>>>>> Since these sessionids are 'random' and become unused
after the
>>>>>>>>>> visitor leaves the website over time the system will
have seen millions of
>>>>>>>>>> those sessionids.
>>>>>>>>>>
>>>>>>>>>> So I was wondering: how are these OperatorStates
cleaned?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best regards / Met vriendelijke groeten,
>>>>>>>>>>
>>>>>>>>>> Niels Basjes
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards / Met vriendelijke groeten,
>>>>>>>>
>>>>>>>> Niels Basjes
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards / Met vriendelijke groeten,
>>>>>>
>>>>>> Niels Basjes
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>
>>>
>>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Mime
View raw message