flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Carst Tankink <ctank...@bol.com>
Subject ProcessFunction gets elements out of order?
Date Fri, 02 Jun 2017 09:26:59 GMT
Hi all,

Based on the advice of Aljoscha in this ’m trying to implement a ProcessFunction that simulates
the original sliding window (using Flink 1.2.1, still). 
My current setup is as follows for a window that is windowWidth wide and slides every windowSlide:

- Keep a ListState<Tuple2<Long, InputType>>
- processElement adds the incoming element together with context.timestamp() to the list state
- If no time was started (a flag in the function instance), registerEventTimeTimer(context.timestamp()
+ windowWidth); set flag to true:
       if (!timerStarted) {
                   ctx.timerService().registerEventTimeTimer(timestamp + windowWidthMs); //
timestamp = context.timestamp()
                   timerStarted = true;
        		}
- onTimer gets the list, restricts it to all elements that are currentTime – windowWidth
in the past, and passes them on to a wrapped function that does my ‘domain logic’. The
result of that function is passed on to the collector. After that register a new timer at
currentTime + windowSlide.

I’m now testing this function using a LocalEnvironment and a stream (fromCollection) of
increasing longs, with the timestamp being the value of the long. The toplogy is: 
streamOfLongs
  .map(l -> Long.toString(l)) // Needed because myProcessFunction operates on Strings.
It also triggers parallel operation.
  .keyBy(x -> 0)
  .process(myProcessFunction)

What I’m now running into is that the first element passed to myProcessFunction is not always
the first element in the stream (unless I set the parallelism of the operators to 1), but
I need the timestamp of the first element to start off my timer chain. 
Is there a way around this? The only solution right now is changing the onTimer call to extract
all windows (from the beginning) of the state and then clean up the ListState to remove elements
that are no longer part of any future window (based on the watermark). However, this feels
a bit clunky, and might still lead to duplicate ‘windows’ without some extra bookkeeping.


BTW, it seems that the example of ProcessingFunction in the docks (of Flink 1.3.0) also has
this problem, since it sets the state to refer to the last processed element,  but if elements
are processed in parallel, they do not arrive in order, so the last processed element might
not be the most recent element.


Thanks,
Carst



Mime
View raw message