Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7AE7118B0F for ; Tue, 1 Dec 2015 17:34:13 +0000 (UTC) Received: (qmail 73315 invoked by uid 500); 1 Dec 2015 17:34:13 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 73240 invoked by uid 500); 1 Dec 2015 17:34:13 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 73230 invoked by uid 99); 1 Dec 2015 17:34:13 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Dec 2015 17:34:13 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id A3D741A41F8 for ; Tue, 1 Dec 2015 17:34:12 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.002 X-Spam-Level: *** X-Spam-Status: No, score=3.002 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_FONT_LOW_CONTRAST=0.001, HTML_MESSAGE=3, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id tkM216dkzdji for ; Tue, 1 Dec 2015 17:34:04 +0000 (UTC) Received: from mail-qg0-f52.google.com (mail-qg0-f52.google.com [209.85.192.52]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 107C020D02 for ; Tue, 1 Dec 2015 17:34:04 +0000 (UTC) Received: by qgec40 with SMTP id c40so11002058qge.2 for ; Tue, 01 Dec 2015 09:34:03 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:content-type; bh=H4G4kC4whXA8Cuk/54uHbykcA7Ve+bGSreEA1ACT0nE=; b=n8rIQG6/4XHaIGi2TWgLQQRReJzi5lT6S30CcqBEfIyJWt/KXlZ/iMvkcguNMFtgS6 +mVlPHXSFHNaaUWINwJk4Ljl97Y5VFPussAYdYjxJOgDvRTrFPdBKgiv4VLVy6BKtrPD HMEBYuISZzM7+AMF1uA5uXlHFQrzCdRCiSLDbf+tz7fz3ClOLEWqpPmTJ/YKlSUXEb5B eyIcrZjeLbfmR+lj8+yEzCznE7VKPUVXO5VaI+dg8mWfPm8jNPkedRB3674o9x2i4Rvn RQf/mPtdfnppQEGy8Bh4F3cxzvWv64l9YeoQ9X+1z/qS7n4iJfeiwLoidaOLI90R/xHa UzRQ== MIME-Version: 1.0 X-Received: by 10.140.226.198 with SMTP id w189mr87498820qhb.83.1448991243310; Tue, 01 Dec 2015 09:34:03 -0800 (PST) Sender: ewenstephan@gmail.com Received: by 10.55.147.1 with HTTP; Tue, 1 Dec 2015 09:34:03 -0800 (PST) In-Reply-To: References: Date: Tue, 1 Dec 2015 18:34:03 +0100 X-Google-Sender-Auth: 5ANzZnNOdOWlzr6u1__2jU0n8cM Message-ID: Subject: Re: Cleanup of OperatorStates? From: Stephan Ewen To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11371052571ea40525d9909a --001a11371052571ea40525d9909a Content-Type: text/plain; charset=UTF-8 Hi! If you want to run with checkpoints (fault tolerance), you need to specify a place to store the checkpoints to. By default, it is the master's memory (or zookeeper in HA), so we put a limit on the size of the size of the state there. To use larger state, simply configure a different place to store checkpoints to, and you can grow your size as large as your memory permits: env.setStateBackend(new FsStateBackend("hdfs:///data/flink-checkpoints")); or env.setStateBackend(new FsStateBackend("file:///data/flink-checkpoints")); More information on that is in the docs: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/state_backends.html Greetings, Stephan On Tue, Dec 1, 2015 at 5:23 PM, Niels Basjes wrote: > Hi, > > The first thing I noticed is that the Session object maintains a list of > all events in memory. > Your events are really small yet in my scenario the predicted number of > events per session will be above 1000 and each is expected to be in the > 512-1024 bytes range. > This worried me yet I decided to give your code a run. > > After a while running it in my IDE (not on cluster) I got this: > > 17:18:46,336 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 269 @ 1448986726336 > 17:18:46,587 INFO org.apache.flink.runtime.taskmanager.Task > - sessionization -> Sink: Unnamed (4/4) switched to FAILED with > exception. > java.lang.RuntimeException: Error triggering a checkpoint as the result of > receiving checkpoint barrier > at > org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:577) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:570) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:201) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:127) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:173) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Size of the state is larger than the > maximum permitted memory-backed state. Size=5246277 , maxSize=5242880 . > Consider using a different state backend, like the File System State > backend. > at > org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:130) > at > org.apache.flink.runtime.state.memory.MemoryStateBackend.checkpointStateSerializable(MemoryStateBackend.java:108) > at > com.dataartisans.streaming.sessionization.SessionizingOperator.snapshotOperatorState(SessionizingOperator.java:162) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:440) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:574) > ... 8 more > > > Niels > > > > On Tue, Dec 1, 2015 at 4:41 PM, Niels Basjes wrote: > >> Thanks! >> I'm going to study this code closely! >> >> Niels >> >> On Tue, Dec 1, 2015 at 2:50 PM, Stephan Ewen wrote: >> >>> Hi Niels! >>> >>> I have a pretty nice example for you here: >>> https://github.com/StephanEwen/sessionization >>> >>> It keeps only one state and has the structure: >>> >>> >>> (source) --> (window sessions) ---> (real time sink) >>> | >>> +--> (15 minute files) >>> >>> >>> The real time sink gets the event with attached visitId immediately. The >>> session operator, as a side effect, writes out the 15 minute files with >>> sessions that expired in that time. >>> >>> >>> It is not a lot of code, the two main parts are >>> >>> - the program and the program skeleton: >>> https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/EventTimeSessionization.java >>> - the sessionizing and file writing operator: >>> https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/SessionizingOperator.java >>> >>> >>> The example runs fully on event time, where the timestamps are extracted >>> from the records. That makes this program very robust (no issue with >>> clocks, etc). >>> >>> Also, here comes the amazing part: The same program should do "replay" >>> and real time. The only difference is what input you give it. Since time is >>> event time, it can do both. >>> >>> >>> One note: >>> - Event Time Watermarks are the mechanism to signal progress in event >>> time. It is simple here, because I assume that timestamps are ascending in >>> a Kafka partition. If that is not the case, you need to implement a more >>> elaborate TimestampExtractor. >>> >>> >>> Hope you can work with this! >>> >>> Greetings, >>> Stephan >>> >>> >>> On Tue, Dec 1, 2015 at 1:00 PM, Stephan Ewen wrote: >>> >>>> Just for clarification: The real-time results should also contain the >>>> visitId, correct? >>>> >>>> On Tue, Dec 1, 2015 at 12:06 PM, Stephan Ewen wrote: >>>> >>>>> Hi Niels! >>>>> >>>>> If you want to use the built-in windowing, you probably need two >>>>> window: >>>>> - One for ID assignment (that immediately pipes elements through) >>>>> - One for accumulating session elements, and then piping them into >>>>> files upon session end. >>>>> >>>>> You may be able to use the rolling file sink (roll by 15 minutes) to >>>>> store the files. >>>>> That is probably the simplest to implement and will serve the real >>>>> time case. >>>>> >>>>> >>>>> +--> (real time sink) >>>>> | >>>>> (source) --> (window session ids) --+ >>>>> | >>>>> +--> (window session) --> (rolling >>>>> sink) >>>>> >>>>> >>>>> You can put this all into one operator that accumulates the session >>>>> elements but still immediately emits the new records (the realtime path), >>>>> if you implement your own windowing/buffering in a custom function. >>>>> This is also very easy to put onto event time then, which makes it >>>>> valueable to process the history (replay). For this second case, still >>>>> prototyping some code for the event time case, give me a bit, I'll get back >>>>> at you... >>>>> >>>>> Greetings, >>>>> Stephan >>>>> >>>>> >>>>> On Tue, Dec 1, 2015 at 10:55 AM, Niels Basjes wrote: >>>>> >>>>>> Hi Stephan, >>>>>> >>>>>> I created a first version of the Visit ID assignment like this: >>>>>> >>>>>> First I group by sessionid and I create a Window per visit. >>>>>> The custom Trigger for this window does a 'FIRE' after each element >>>>>> and sets an EventTimer on the 'next possible moment the visit can expire'. >>>>>> To avoid getting 'all events' in the visit after every 'FIRE' I'm >>>>>> using CountEvictor.of(1). >>>>>> When the visit expires I do a PURGE. So if there are more events >>>>>> afterwards for the same sessionId I get a new visit (which is exactly what >>>>>> I want). >>>>>> >>>>>> The last step I do is I want to have a 'normal' DataStream again to >>>>>> work with. >>>>>> I created this WindowFunction to map the Window stream back to >>>>>> normal DataStream >>>>>> Essentially I do this: >>>>>> >>>>>> DataStream visitDataStream = visitWindowedStream.apply(new >>>>>> WindowToStream()) >>>>>> >>>>>> // This is an identity 'apply' >>>>>> private static class WindowToStream implements WindowFunction>>>>> T, String, GlobalWindow> { >>>>>> @Override >>>>>> public void apply(String s, GlobalWindow window, Iterable >>>>>> values, Collector out) throws Exception { >>>>>> for (T value: values) { >>>>>> out.collect(value); >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> The problem with this is that I first create the visitIds in a Window >>>>>> (great). >>>>>> Because I really need to have both the Windowed events AND the near >>>>>> realtime version I currently break down the Window to get the single events >>>>>> and after that I have to recreate the same Window again. >>>>>> >>>>>> I'm looking forward to the implementation direction you are referring >>>>>> to. I hope you have a better way of doing this. >>>>>> >>>>>> Niels Basjes >>>>>> >>>>>> >>>>>> On Mon, Nov 30, 2015 at 9:29 PM, Stephan Ewen >>>>>> wrote: >>>>>> >>>>>>> Hi Niels! >>>>>>> >>>>>>> Nice use case that you have! >>>>>>> I think you can solve this super nicely with Flink, such that >>>>>>> "replay" and "realtime" are literally the same program - they differ only >>>>>>> in whether >>>>>>> >>>>>>> Event time is, like you said, the key thing for "replay". Event time >>>>>>> depends on the progress in the timestamps of the data, so it can progress >>>>>>> at different speeds, depending on what the rate of your stream is. >>>>>>> With the appropriate data source, it will progress very fast in >>>>>>> "replay mode", so that you replay in "fast forward speed", and it >>>>>>> progresses at the same speed as processing time when you attach to the end >>>>>>> of the Kafka queue. >>>>>>> >>>>>>> When you define the time intervals in your program to react to event >>>>>>> time progress, then you will compute the right sessionization in both >>>>>>> replay and real time settings. >>>>>>> >>>>>>> I am writing a little example code to share. The type of >>>>>>> ID-assignment sessions you want to do need an undocumented API right now, >>>>>>> so I'll prepare something there for you... >>>>>>> >>>>>>> Greetings, >>>>>>> Stephan >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sun, Nov 29, 2015 at 4:04 PM, Niels Basjes >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> The sessionid is present in the measurements. It can also be seen >>>>>>>> as a form of 'browser id'. >>>>>>>> Most websites use either a 'long lived random value in a cookie' or >>>>>>>> a 'application session id' for this. >>>>>>>> >>>>>>>> So with the id of the browser in hand I have the need to group all >>>>>>>> events into "periods of activity" which I call a visit. >>>>>>>> Such a visit is a bounded subset of all events from a single >>>>>>>> browser. >>>>>>>> >>>>>>>> What I need is to add a (sort of) random visit id to the events >>>>>>>> that becomes 'inactive' after more than X minutes of inactivity. >>>>>>>> I then want to add this visitid to each event and >>>>>>>> 1) stream them out in realtime >>>>>>>> 2) Wait till the visit ends and store the complete visit on disk (I >>>>>>>> am going for either AVRO or Parquet). >>>>>>>> >>>>>>>> I want to create diskfiles with all visits that ended in a specific >>>>>>>> time period. So essentially >>>>>>>> "Group by round(, 15 minutes)" >>>>>>>> >>>>>>>> >>>>>>>> Because of the need to be able to 'repair' things I came with the >>>>>>>> following question: >>>>>>>> In the Flink API I see the 'process time' (i.e. the actual time of >>>>>>>> the server) and the 'event time' (i.e. the time when and event was >>>>>>>> recorded). >>>>>>>> >>>>>>>> Now in my case all events are in Kafka (for say 2 weeks). >>>>>>>> When something goes wrong I want to be able to 'reprocess' >>>>>>>> everything from the start of the queue. >>>>>>>> Here the matter of 'event time' becomes a big question for me; In >>>>>>>> those 'replay' situations the event time will progress at a much higher >>>>>>>> speed than the normal 1sec/sec. >>>>>>>> >>>>>>>> How does this work in Apache Flink? >>>>>>>> >>>>>>>> >>>>>>>> Niels Basjes >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Nov 27, 2015 at 3:28 PM, Stephan Ewen >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hey Niels! >>>>>>>>> >>>>>>>>> You may be able to implement this in windows anyways, depending on >>>>>>>>> your setup. You can definitely implement state with timeout yourself (using >>>>>>>>> the more low-level state interface), or you may be able to use custom >>>>>>>>> windows for that (they can trigger on every element and return elements >>>>>>>>> immediately, thereby giving you low latency). >>>>>>>>> >>>>>>>>> Can you tell me where exactly the session ID comes from? Is that >>>>>>>>> something that the function with state generates itself? >>>>>>>>> Depending on that answer, I can outline either the window, or the >>>>>>>>> custom state way... >>>>>>>>> >>>>>>>>> Greetings, >>>>>>>>> Stephan >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Nov 27, 2015 at 2:19 PM, Niels Basjes >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Thanks for the explanation. >>>>>>>>>> I have clickstream data arriving in realtime and I need to assign >>>>>>>>>> the visitId and stream it out again (with the visitId now begin part of the >>>>>>>>>> record) into Kafka with the lowest possible latency. >>>>>>>>>> Although the Window feature allows me to group and close the >>>>>>>>>> visit on a timeout/expire (as shown to me by Aljoscha in a separate email) >>>>>>>>>> it does make a 'window'. >>>>>>>>>> >>>>>>>>>> So (as requested) I created a ticket for such a feature: >>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-3089 >>>>>>>>>> >>>>>>>>>> Niels >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Nov 27, 2015 at 11:51 AM, Stephan Ewen >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Niels! >>>>>>>>>>> >>>>>>>>>>> Currently, state is released by setting the value for the key to >>>>>>>>>>> null. If you are tracking web sessions, you can try and send a "end of >>>>>>>>>>> session" element that sets the value to null. >>>>>>>>>>> >>>>>>>>>>> To be on the safe side, you probably want state that is >>>>>>>>>>> automatically purged after a while. I would look into using Windows for >>>>>>>>>>> that. The triggers there are flexible so you can schedule both actions on >>>>>>>>>>> elements plus cleanup after a certain time delay (clock time or event time). >>>>>>>>>>> >>>>>>>>>>> The question about "state expiry" has come a few times. People >>>>>>>>>>> seem to like working on state directly, but it should clean up >>>>>>>>>>> automatically. >>>>>>>>>>> >>>>>>>>>>> Can you see if your use case fits onto windows, otherwise open a >>>>>>>>>>> ticket for state expiry? >>>>>>>>>>> >>>>>>>>>>> Greetings, >>>>>>>>>>> Stephan >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Nov 26, 2015 at 10:42 PM, Niels Basjes >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> I'm working on a streaming application that ingests clickstream >>>>>>>>>>>> data. >>>>>>>>>>>> In a specific part of the flow I need to retain a little bit of >>>>>>>>>>>> state per visitor (i.e. keyBy(sessionid) ) >>>>>>>>>>>> >>>>>>>>>>>> So I'm using the Key/Value state interface (i.e. OperatorState< >>>>>>>>>>>> MyRecord>) in a map function. >>>>>>>>>>>> >>>>>>>>>>>> Now in my application I expect to get a huge number of sessions >>>>>>>>>>>> per day. >>>>>>>>>>>> Since these sessionids are 'random' and become unused after the >>>>>>>>>>>> visitor leaves the website over time the system will have seen millions of >>>>>>>>>>>> those sessionids. >>>>>>>>>>>> >>>>>>>>>>>> So I was wondering: how are these OperatorStates cleaned? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Best regards / Met vriendelijke groeten, >>>>>>>>>>>> >>>>>>>>>>>> Niels Basjes >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best regards / Met vriendelijke groeten, >>>>>>>>>> >>>>>>>>>> Niels Basjes >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best regards / Met vriendelijke groeten, >>>>>>>> >>>>>>>> Niels Basjes >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best regards / Met vriendelijke groeten, >>>>>> >>>>>> Niels Basjes >>>>>> >>>>> >>>>> >>>> >>> >> >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes >> > > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > --001a11371052571ea40525d9909a Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi!

If you want to run with checkpoints= (fault tolerance), you need to specify a place to store the checkpoints to= .

By default, it is the master's memory (or zo= okeeper in HA), so we put a limit on the size of the size of the state ther= e.

To use larger state, simply configure a differe= nt place to store checkpoints to, and you can grow your size as large as yo= ur memory permits:

env.setStateBackend(new FsState= Backend("hdfs:///data/flink-checkpoints"));

<= div>or

env.setStateBackend(new FsStateBackend(&quo= t;file:///data/flink-checkpoints"));


=

Greetings,
Stephan



On Tue, Dec 1, 2015 at 5:23 PM, Niels Basjes <Niels@b= asjes.nl> wrote:
Hi,

The first thing I noticed is that the Sessi= on object maintains a list of all events in memory.
Your events a= re really small yet in my scenario the predicted number of events per sessi= on will be above 1000 and each is expected to be in the 512-1024 bytes rang= e.
This worried me yet I decided to give your code a run.

After a while running it in my IDE (not on cluster) I got= this:

17:18:46,336 INFO =C2=A0org.apache.fli= nk.runtime.checkpoint.CheckpointCoordinator =C2=A0 =C2=A0 - Triggering chec= kpoint 269 @ 1448986726336
17:18:46,587 INFO =C2=A0org.apache.fli= nk.runtime.taskmanager.Task =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 - sessionization -> Sink: Unnamed (4/4) switche= d to FAILED with exception.
java.lang.RuntimeException: Error tri= ggering a checkpoint as the result of receiving checkpoint barrier
at org.apache.flink.streamin= g.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:577)
at org.apache.flink.streaming.runtime= .tasks.StreamTask$1.onEvent(StreamTask.java:570)
at org.apache.flink.streaming.runtime.io.Barri= erBuffer.processBarrier(BarrierBuffer.java:201)
at org.apache.flink.streaming.runtime.io.Barrie= rBuffer.getNextNonBlocked(BarrierBuffer.java:127)
at org.apache.flink.streaming.runtime.io.Stre= amInputProcessor.processInput(StreamInputProcessor.java:173)
at org.apache.flink.streaming.runt= ime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
at org.apache.flink.streaming.runt= ime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.r= un(Task.java:584)
at= java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOExcep= tion: Size of the state is larger than the maximum permitted memory-backed = state. Size=3D5246277 , maxSize=3D5242880 . Consider using a different stat= e backend, like the File System State backend.
at org.apache.flink.runtime.state.memory.MemoryS= tateBackend.checkSize(MemoryStateBackend.java:130)
at org.apache.flink.runtime.state.memory.Mem= oryStateBackend.checkpointStateSerializable(MemoryStateBackend.java:108)
at com.dataartisans.st= reaming.sessionization.SessionizingOperator.snapshotOperatorState(Sessioniz= ingOperator.java:162)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(= StreamTask.java:440)
at org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTas= k.java:574)
... 8 mo= re


Niels


<= br>
On Tue, Dec 1, 2015 at 4:41 PM, Niels Basjes = <= Niels@basjes.nl> wrote:
Thanks!
I'm going to study this code closely!

Niels

On Tue, Dec 1, 2015 at 2:50 PM, Step= han Ewen <sewen@apache.org> wrote:
Hi Niels!

I have a pretty nic= e example for you here:=C2=A0https://github.com/StephanEwen/sessionization=

It keeps only one state and has the structure= :


<= font face=3D"monospace, monospace">(source) --> (window sessions) ---> (= real time sink)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 |
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 +--> (15 minute files)


The real time sink gets the event wi= th attached visitId immediately. The session operator, as a side effect, wr= ites out the 15 minute files with sessions that expired in that time.
=


It is not a lot of code, the two main pa= rts are



The example runs fully on event time, where the timestamps are extracted = from the records. That makes this program very robust (no issue with clocks= , etc).

Also, here comes the amazing part: The= same program should do "replay" and real time. The only differen= ce is what input you give it. Since time is event time, it can do both.


One note:
=C2=A0 - Event Tim= e Watermarks are the mechanism to signal progress in event time. It is simp= le here, because I assume that timestamps are ascending in a Kafka partitio= n. If that is not the case, you need to implement a more elaborate Timestam= pExtractor.


Hope you can work with = this!

Greetings,
Stephan

<= /div>

On Tue, Dec 1, 2015 at 1:00 PM, Stephan Ewen <sewen@apache.org><= /span> wrote:
Just for c= larification: The real-time results should also contain the visitId, correc= t?

On Tue, Dec 1, 2015 at 12:06 PM, Stephan Ewen <sewen@apache.org> wrote:
Hi Niels!
If you want to use the built-in windowing, you probably ne= ed two window:
=C2=A0 - One for ID assignment (that immediately p= ipes elements through)
=C2=A0 - One for accumulating session elem= ents, and then piping them into files upon session end.

You may be able to use the rolling file sink (roll by 15 minutes) to = store the files.
That is probably the simplest to implement and w= ill serve the real time case.


= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 +--> (real time sin= k)
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 |
(source) --> (window session ids) --+
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 |
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 +--> (window session) --&g= t; (rolling sink)


You can pu= t this all into one operator that accumulates the session elements but stil= l immediately emits the new records (the realtime path), if you implement y= our own windowing/buffering in a custom function.
This is als= o very easy to put onto event time then, which makes it valueable to proces= s the history (replay). For this second case, still prototyping some code f= or the event time case, give me a bit, I'll get back at you...

Greetings,
Stephan


On Tue, De= c 1, 2015 at 10:55 AM, Niels Basjes <Niels@basjes.nl> wrote:
Hi Stephan,

=
I created a first version of the Visit ID assignment like this:<= /div>

First I group by sessionid and I create a Window p= er visit.=C2=A0
The custom Trigger for this window does a 'FI= RE' after each element and sets an EventTimer on the 'next possible= moment the visit can expire'.=C2=A0
To avoid getting 'al= l events' in the visit after every 'FIRE' I'm using CountEv= ictor.of(1).
When the visit expires I do a PURGE. So if there are= more events afterwards for the same sessionId I get a new visit (which is = exactly what I want).

The last step I do is I want= to have a 'normal' DataStream again to work with.
I crea= ted this WindowFunction to map the Window stream back to =C2=A0normal DataS= tream
Essentially I do this:
=C2=A0 =C2=A0
Da= taStream<Foo> visitDataStream =3D visitWindowedStream.apply(new Windo= wToStream<Foo>())

// This is an identity = 9;apply'
private static class WindowToStream<T> imp= lements WindowFunction<T, T, String, GlobalWindow> {
=C2=A0= =C2=A0 @Override
=C2=A0 =C2=A0 public void apply(String s, Globa= lWindow window, Iterable<T> values, Collector<T> out) throws Ex= ception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 for (T value: values) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 out.collect(value);
<= div>=C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 }
}


The problem with this is that I first = create the visitIds in a Window (great).
Because I really need to= have both the Windowed events AND the near realtime version I currently br= eak down the Window to get the single events and after that I have to recre= ate the same Window again.

I'm looking forward= to the implementation direction you are referring to. I hope you have a be= tter way of doing this.

<= div>Niels Basjes


On Mon, Nov 30, 201= 5 at 9:29 PM, Stephan Ewen <sewen@apache.org> wrote:
Hi Niels!

Nic= e use case that you have!
I think you can solve this super nicely= with Flink, such that "replay" and "realtime" are lite= rally the same program - they differ only in whether=C2=A0

Event time is, like you said, the key thing for "replay"= . Event time depends on the progress in the timestamps of the data, so it c= an progress at different speeds, depending on what the rate of your stream = is.
With the appropriate data source, it will progress very fast = in "replay mode", so that you replay in "fast forward speed&= quot;, and it progresses at the same speed as processing time when you atta= ch to the end of the Kafka queue.

When you define = the time intervals in your program to react to event time progress, then yo= u will compute the right sessionization in both replay and real time settin= gs.

I am writing a little example code to share. T= he type of ID-assignment sessions you want to do need an undocumented API r= ight now, so I'll prepare something there for you...

Greetings,
Stephan



On Su= n, Nov 29, 2015 at 4:04 PM, Niels Basjes <Niels@basjes.nl> wro= te:
Hi,
<= br>
The sessionid is present in the measurements. It can also be seen = as a form of 'browser id'.
Most websites use either a 'long= lived random value in a cookie' or a 'application session id' = for this.

So with the id of the browser in han= d I have the need to group all events into "periods of activity" = which I call a visit.
Such a visit is a bounded subset of all eve= nts from a single browser.

What I need is to add a= (sort of) random visit id to the events that becomes 'inactive' af= ter more than X minutes of inactivity.
I then want to add this vi= sitid to each event and=C2=A0
1) stream them out in realtime
2) Wait till the visit ends and store the complete visit on disk (I a= m going for either AVRO or Parquet).

I want to cre= ate diskfiles with all visits that ended in a specific time period. So esse= ntially=C2=A0
=C2=A0 =C2=A0 =C2=A0 =C2=A0 "Group by round(&l= t;timestamp of last event>, 15 minutes)"

<= br>
Because of the need to be able to 'repair' things I c= ame with the following question:
In the Flink API I see the &= #39;process time' (i.e. the actual time of the server) and the 'eve= nt time' (i.e. the time when and event was recorded).

Now in my case all events are in Kafka (for say 2 weeks).
When something goes wrong I want to be able to 'reprocess' everyth= ing from the start of the queue.
Here the matter of 'event ti= me' becomes a big question for me; In those 'replay' situations= the event time will progress at a much higher speed than the normal 1sec/s= ec.

How does this work in Apache Flink?


Niels Basjes


On Fri, Nov 27, 2015 at 3:28 PM, = Stephan Ewen <sewen@apache.org> wrote:
Hey Niels!

You= may be able to implement this in windows anyways, depending on your setup.= You can definitely implement state with timeout yourself (using the more l= ow-level state interface), or you may be able to use custom windows for tha= t (they can trigger on every element and return elements immediately, there= by giving you low latency).

Can you tell me where = exactly the session ID comes from? Is that something that the function with= state generates itself?
Depending on that answer, I can outline = either the window, or the custom state way...

Gree= tings,
Stephan



=


On Fri, Nov 27, 2015 at 2:19 PM, Niels Basjes <Niels@basjes.nl= > wrote:
H= i,

Thanks for the explanation.
I have clickstr= eam data arriving in realtime and I need to assign the visitId and stream i= t out again (with the visitId now begin part of the record) into Kafka with= the lowest possible latency.
Although the Window feature allows = me to group and close the visit on a timeout/expire (as shown to me by=C2= =A0Aljoscha in a separate email) it does make a 'window'.
=

So (as requested) I created a ticket for such a feature= :

Niels=C2=A0


<= br>



On Fri, Nov 27, 2015 at= 11:51 AM, Stephan Ewen <sewen@apache.org> wrote:
Hi Niels!

Curren= tly, state is released by setting the value for the key to null. If you are= tracking web sessions, you can try and send a "end of session" e= lement that sets the value to null.

To be on the s= afe side, you probably want state that is automatically purged after a whil= e. I would look into using Windows for that. The triggers there are flexibl= e so you can schedule both actions on elements plus cleanup after a certain= time delay (clock time or event time).

The questi= on about "state expiry" has come a few times. People seem to like= working on state directly, but it should clean up automatically.
=

Can you see if your use case fits onto windows, otherwi= se open a ticket for state expiry?

Greetings,
Stephan


On Thu, Nov 26, 2015 at 10:42 PM, Niels Ba= sjes <Niels@basjes.nl> wrote:
Hi,

I'm working on a streaming ap= plication that ingests clickstream data.
In a specific part of th= e flow I need to retain a little bit of state per visitor (i.e. keyBy(sessi= onid) )

So I'm using the Key/Value state inter= face (i.e.=C2=A0OperatorState<MyRecord>) in a map function.
Now in my application I expect to get a huge number of sessions= per day.
Since these sessionids are 'random' and become = unused after the visitor leaves the website over time the system will have = seen millions of those sessionids.

So I was wonder= ing: how are these OperatorStates cleaned?


--
Best regards / Met vriend= elijke groeten,

Niels Basjes




--
=
Best regards / Met vriendelijke groeten,

Niels Basjes




--
=
Best regards / Met vriendelijke groeten,

Niels Basjes




--
=
Best regards / Met vriendelijke groeten,

Niels Basjes






--
=
Best regards / Met vriendelijke groeten,

Niels Basjes



--
Best re= gards / Met vriendelijke groeten,

Niels Basjes

--001a11371052571ea40525d9909a--