flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Working with the Windowing functionality
Date Fri, 27 Nov 2015 10:28:40 GMT
Hi Niels,
do the records that arrive from Kafka already have the session ID or do you want to assign
them inside your Flink job based on the idle timeout?

For the rest of your problems you should be able to get by with what Flink provides:

The triggering can be done using a custom Trigger that fires after we haven’t seen an element
for 30 minutes.
public class TimeoutTrigger implements Trigger<Object, Window> {
   private static final long serialVersionUID = 1L;

   public TriggerResult onElement(Object element, long timestamp, Window window, TriggerContext
ctx) throws Exception {
      // on every element it will set a timer for 30 seconds in the future
      // a trigger can only have 1 timer so we remove the old trigger when setting the new
      ctx.registerProcessingTimeTimer(System.currentTimeMillis() + 30000); // this is 30 seconds
but you can change it
      return TriggerResult.CONTINUE;

   public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) {
      return TriggerResult.CONTINUE;

   public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx) throws
Exception {
      return TriggerResult.FIRE_AND_PURGE;

   public String toString() {
      return "TimeoutTrigger()";

you would use it like this:
stream.keyBy(…).window(…).trigger(new TimeoutTrigger())

For writing to files you could use the RollingSink (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#hadoop-filesystem).
I think this does pretty much what you want. You can specify how large the files that it writes
are, and it can also roll to new files on a specified time interval.

Please let us know if you need more information.

> On 26 Nov 2015, at 22:13, Niels Basjes <Niels@basjes.nl> wrote:
> Hi,
> I'm trying to build something in Flink that relies heavily on the Windowing features.
> In essence what I want to build:
> I have clickstream data coming in via Kafka. Each record (click) has a sessionid and
a timestamp.
> I want to create a window for each session and after 30 minutes idle I want all events
for that session (visit) to be written to disk.
> This should result in the effect that a specific visit exists in exactly one file.
> Since HDFS does not like 'small files' I want to create a (set of) files every 15 minutes
that contains several complete  visits.
> So I need to buffer the 'completed visits' and flush them to disk in 15 minute batches.
> What I think I need to get this is:
> 1) A map function that assigns the visit-id (i.e. new id after 30 minutes idle)
> 2) A window per visit-id (close the window 30 minutes after the last click) 
> 3) A window per 15 minutes that only contains windows of visits that are complete 
> Today I've been trying to get this setup and I think I have some parts that are in the
right direction.
> I have some questions and I'm hoping you guys can help me:
> 1) I have trouble understanding the way a windowed stream works "exactly". 
> As a consequence I'm having a hard time verifying if my code does what I understand it
should do. 
> I guess what would really help me is a very simple example on how to unittest such a
> 2) Is what I describe above perhaps already been done before? If so; any pointers are
really appreciated.
> 3) Am I working in the right direction for what I'm trying to achieve; or should I use
a different API? a different approach?
> Thanks
> -- 
> Best regards / Met vriendelijke groeten,
> Niels Basjes

View raw message