flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: WindowOperator - element's timestamp
Date Tue, 15 Nov 2016 12:05:47 GMT
Hi,
I understand now. For early (speculative) firing I would suggest to write a
custom trigger that repeatedly fires on processing time. We're also working
on a Trigger DSL that will make such cases simpler, for example, you would
be able to write:

window.trigger(EventTime.pastEndOfWindow().withEarlyFiring(ProcessingTime.after(Time.minutes(5))))

We're also working on this
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
to
give some more metadata for window processing. What might be interesting
for you is the firing reason and maybe the firing counter.

Cheers,
Aljoscha

On Mon, 14 Nov 2016 at 21:54 Petr Novotnik <petr.novotnik@firma.seznam.cz>
wrote:

> Aljoscha,
>
> thanks for your response. The use-case I'm after is basically providing
> "early" (inaccurate) results to downstream consumers. Suppose we're
> running aggregations for daily time windows, but we don't want to wait a
> whole day to see results. The idea is to fire the windows continuously
> before they hit their end of life (at which point they fill be
> fired_and_purged and will provide the final, accurate answer.)
>
> However, if all of these "early" fired panes emit elements with a
> timestamp equaling the end-of-the-window, stateful downstream operators
> a) have no chance distinguishing between the different panes of the same
> window b) and don't have any chance to set-up timers before the
> watermark at the downstream operator advances to the "end of the day".
>
> Hope this clarifies my motivation a bit,
> P.
>
> On 11/14/2016 03:22 PM, Aljoscha Krettek wrote:
> > Hi,
> > I'm afraid the ContinuousEventTimeTrigger is a bit misleading and should
> > probably be removed. The problem is that a watermark T signals that we
> > won't see elements with a timestamp < T in the future. It does not
> > signal that we haven't already seen elements with a timestamp > T. So
> > this cannot be used to trigger at different stages of a given window.
> >
> > Do you have a concrete use case in mind for which you wanted to use
> > ContinuousEventTimeTrigger?
> >
> > Cheers,
> > Aljoscha
> >
> > On Mon, 14 Nov 2016 at 09:58 Ufuk Celebi <uce@apache.org
> > <mailto:uce@apache.org>> wrote:
> >
> >     Looping in Kostas and Aljoscha who should know what's the expected
> >     behaviour here ;)
> >
> >
> >     On 11 November 2016 at 16:17:23, Petr Novotnik
> >     (petr.novotnik@firma.seznam.cz
> >     <mailto:petr.novotnik@firma.seznam.cz>) wrote:
> >     > Hello,
> >     >
> >     > I'm struggling to understand the following behaviour of the
> >     > `WindowOperator` and would appreciate some insight from experts:
> >     >
> >     > In particular I'm thinking about the following hypothetical data
> flow:
> >     >
> >     > input.keyBy(..)
> >     > .window(TumblingEventTimeWindows.of(..))
> >     > .apply(..)
> >     > ...
> >     > .keyBy(..)
> >     > .window(CustomWindowAssignerUsingATriggerBasedOnTheElementsStamp)
> >     > .apply(..)
> >     >
> >     > When the first window operator fires a window based on the timer,
> the
> >     > emitted elements are assigned a timestamp which equals
> >     > `window.maxTimestamp()`. This stamp is then available in the second
> >     > window operator's trigger through the `onElement` method. So far
> >     so good.
> >     >
> >     > However, when using `ContinuousEventTimeTrigger` (simply put when
> >     firing
> >     > the window multiple times at different times in its lifecycle) in
> the
> >     > first window operator, _all_ of the elements of this window - no
> >     matter
> >     > whether fired as a partial or the final window result - will
> >     arrive with
> >     > the same stamp in the (downstream) operators.
> >     >
> >     > This make it practically impossible to use again
> >     > `ContinuousEventTimeTrigger` (or similar) in the second window
> >     operator
> >     > to achieve "early firing" again.
> >     >
> >     > This is surprising. I would expect the elements to be assigned the
> >     stamp
> >     > of the timer which fired them (which will be window#maxTimestamp()
> for
> >     > `TumblingEventTimeWindows`). Is there any particular reason for the
> >     > unconditional assignment to `window.maxTimestamp()`?
> >     >
> >     > Many thanks in advance,
> >     > P.
> >     >
> >
>

Mime
View raw message