flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: ProcessFunction gets elements out of order?
Date Fri, 02 Jun 2017 14:36:35 GMT
Hi,

To answer your first question, yes, elements (can) arrive out-of-order and in most real-world
use cases they will. Making them arrive in order can be prohibitively expensive because you
have to buffer elements and then sort them when a watermark arrives. It’s possible to do
this in custom user code but Flink will probably not provide such functionality in the near
future. I think you could just set a timer for “timestamp - (timestamp % slide-size) + slide-size”,
this would ensure that you always get a timer on the slide boundaries. This works well because
Flink will de-duplicate timers, that is when you repeatedly set a timer for timestamp t you
will in the end only have one timer for timestamp t.

Regarding your implementation, doing the decision based on an instance field can be problematic
because the user function is re-used for processing elements of different keys. State and
timers are scoped to the key you specified, so it will happen that you only set a timer for
one of your keys and you never process the data that you stored for other keys. As I mentioned
above, you can simply always set a timer that is clamped to modulo-slide-size boundaries.

Best,
Aljoscha

> On 2. Jun 2017, at 11:26, Carst Tankink <ctankink@bol.com> wrote:
> 
> Hi all,
> 
> Based on the advice of Aljoscha in this ’m trying to implement a ProcessFunction that
simulates the original sliding window (using Flink 1.2.1, still). 
> My current setup is as follows for a window that is windowWidth wide and slides every
windowSlide:
> 
> - Keep a ListState<Tuple2<Long, InputType>>
> - processElement adds the incoming element together with context.timestamp() to the list
state
> - If no time was started (a flag in the function instance), registerEventTimeTimer(context.timestamp()
+ windowWidth); set flag to true:
>       if (!timerStarted) {
>                   ctx.timerService().registerEventTimeTimer(timestamp + windowWidthMs);
// timestamp = context.timestamp()
>                   timerStarted = true;
>        		}
> - onTimer gets the list, restricts it to all elements that are currentTime – windowWidth
in the past, and passes them on to a wrapped function that does my ‘domain logic’. The
result of that function is passed on to the collector. After that register a new timer at
currentTime + windowSlide.
> 
> I’m now testing this function using a LocalEnvironment and a stream (fromCollection)
of increasing longs, with the timestamp being the value of the long. The toplogy is: 
> streamOfLongs
>  .map(l -> Long.toString(l)) // Needed because myProcessFunction operates on Strings.
It also triggers parallel operation.
>  .keyBy(x -> 0)
>  .process(myProcessFunction)
> 
> What I’m now running into is that the first element passed to myProcessFunction is
not always the first element in the stream (unless I set the parallelism of the operators
to 1), but I need the timestamp of the first element to start off my timer chain. 
> Is there a way around this? The only solution right now is changing the onTimer call
to extract all windows (from the beginning) of the state and then clean up the ListState to
remove elements that are no longer part of any future window (based on the watermark). However,
this feels a bit clunky, and might still lead to duplicate ‘windows’ without some extra
bookkeeping. 
> 
> BTW, it seems that the example of ProcessingFunction in the docks (of Flink 1.3.0) also
has this problem, since it sets the state to refer to the last processed element,  but if
elements are processed in parallel, they do not arrive in order, so the last processed element
might not be the most recent element.
> 
> 
> Thanks,
> Carst
> 
> 
> 


Mime
View raw message