flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljoscha.kret...@gmail.com>
Subject Re: Working with the Windowing functionality
Date Fri, 27 Nov 2015 15:11:25 GMT
Hi,
yes, you are right in your analysis. Did you try running it with always setting the timer?
Maybe it’s not the bottleneck of the computation. I would be very interested in seeing how
this behaves since I only did tests with regular time windows, where the first if statement
almost always directly returns, which is very cheap.

Cheers,
Aljoscha
> On 27 Nov 2015, at 13:59, Niels Basjes <Niels@basjes.nl> wrote:
> 
> Hi,
> 
> Thanks for all this input.
> I didn't know about the 
>       // a trigger can only have 1 timer so we remove the old trigger when setting the
new one
> 
> This insight is to me of major importance.
> Let me explain: 
> I found in the WindowOperator this code below.
> 
> @Override
> public void registerEventTimeTimer(long time) {
>    if (watermarkTimer == time) {
>       // we already have set a trigger for that time
>       return;
>    }
>    Set<Context> triggers = watermarkTimers.get(time);
>    if (triggers == null) {
>       triggers = new HashSet<>();
>       watermarkTimers.put(time, triggers);
>    }
>    this.watermarkTimer = time;
>    triggers.add(this);
> }
> 
> and
> 
> if (time == watermarkTimer) {
>    watermarkTimer = -1;
>    Trigger.TriggerResult firstTriggerResult = trigger.onEventTime(time, window, this);
> 
> Effectively the new value is stored; processed yet at the moment the trigger fires the
call is not forwarded into the application. 
> So if I would do it as you show in your example I would have the same number of trigger
entries in the watermarkTimers set as I have seen events.
> My application will (in total) handle about 50K events/sec resulting in to thousands
'onEventTime' calls per second.
> 
> So thank you. I now understand I have to be more careful with these timers!.
> 
> Niels Basjes
> 
> 
> 
> On Fri, Nov 27, 2015 at 11:28 AM, Aljoscha Krettek <aljoscha@apache.org> wrote:
> Hi Niels,
> do the records that arrive from Kafka already have the session ID or do you want to assign
them inside your Flink job based on the idle timeout?
> 
> For the rest of your problems you should be able to get by with what Flink provides:
> 
> The triggering can be done using a custom Trigger that fires after we haven’t seen
an element for 30 minutes.
> public class TimeoutTrigger implements Trigger<Object, Window> {
>    private static final long serialVersionUID = 1L;
> 
>    @Override
>    public TriggerResult onElement(Object element, long timestamp, Window window, TriggerContext
ctx) throws Exception {
>       // on every element it will set a timer for 30 seconds in the future
>       // a trigger can only have 1 timer so we remove the old trigger when setting the
new one
>       ctx.registerProcessingTimeTimer(System.currentTimeMillis() + 30000); // this is
30 seconds but you can change it
>       return TriggerResult.CONTINUE;
>    }
> 
>    @Override
>    public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) {
>       return TriggerResult.CONTINUE;
>    }
> 
>    @Override
>    public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx)
throws Exception {
>       return TriggerResult.FIRE_AND_PURGE;
>    }
> 
>    @Override
>    public String toString() {
>       return "TimeoutTrigger()";
>    }
> }
> 
> you would use it like this:
> stream.keyBy(…).window(…).trigger(new TimeoutTrigger())
> 
> For writing to files you could use the RollingSink (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#hadoop-filesystem).
I think this does pretty much what you want. You can specify how large the files that it writes
are, and it can also roll to new files on a specified time interval.
> 
> Please let us know if you need more information.
> 
> Cheers,
> Aljoscha
> > On 26 Nov 2015, at 22:13, Niels Basjes <Niels@basjes.nl> wrote:
> >
> > Hi,
> >
> > I'm trying to build something in Flink that relies heavily on the Windowing features.
> >
> > In essence what I want to build:
> > I have clickstream data coming in via Kafka. Each record (click) has a sessionid
and a timestamp.
> > I want to create a window for each session and after 30 minutes idle I want all
events for that session (visit) to be written to disk.
> > This should result in the effect that a specific visit exists in exactly one file.
> > Since HDFS does not like 'small files' I want to create a (set of) files every 15
minutes that contains several complete  visits.
> > So I need to buffer the 'completed visits' and flush them to disk in 15 minute batches.
> >
> > What I think I need to get this is:
> > 1) A map function that assigns the visit-id (i.e. new id after 30 minutes idle)
> > 2) A window per visit-id (close the window 30 minutes after the last click)
> > 3) A window per 15 minutes that only contains windows of visits that are complete
> >
> > Today I've been trying to get this setup and I think I have some parts that are
in the right direction.
> >
> > I have some questions and I'm hoping you guys can help me:
> >
> > 1) I have trouble understanding the way a windowed stream works "exactly".
> > As a consequence I'm having a hard time verifying if my code does what I understand
it should do.
> > I guess what would really help me is a very simple example on how to unittest such
a window.
> >
> > 2) Is what I describe above perhaps already been done before? If so; any pointers
are really appreciated.
> >
> > 3) Am I working in the right direction for what I'm trying to achieve; or should
I use a different API? a different approach?
> >
> > Thanks
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
> >
> >
> 
> 
> 
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes


Mime
View raw message