Thanks!
@Fabian: Yepp, but this still results in multiple outputs per window,
because the maximum is emitted for every key.
@Gyula: Yepp, that's the second bullet point from my question ;) The way
I implemented it, it basically doubles the latency, because the
timeWindowAll has to wait for the next timeWindow before it can close
the previous one. So if the first timeWindow is 10s, it takes 20s until
you have a result, although it cant change after 10s. You know what I mean?
Cheers,
Konstantin
On 23.11.2015 11:32, Gyula Fóra wrote:
> Hi,
>
> Alright it seems there are multiple ways of doing this.
>
> I would do something like:
>
> ds.keyBy(key)
> .timeWindow(w)
> .reduce(...)
> .timeWindowAll(w)
> .reduce(...)
>
> Maybe Aljoscha could jump in here :D
>
> Cheers,
> Gyula
>
> Fabian Hueske <fhueske@gmail.com <mailto:fhueske@gmail.com>> ezt írta
> (időpont: 2015. nov. 23., H, 11:21):
>
> If you set the key to the time attribute, the "old" key is no longer
> valid.
> The streams are organized by time and only one aggregate for each
> windowtime should be computed.
>
> This should do what you are looking for:
>
> DataStream
> .keyBy(_._1) // key by orginal key
> .timeWindow(..)
> .apply(...) // extract window end time: (origKey, time, agg)
> .keyBy(_._2) // key by time field
> .maxBy(_._3) // value with max agg field
>
> Best, Fabian
>
> 20151123 11:00 GMT+01:00 Konstantin Knauf
> <konstantin.knauf@tngtech.com <mailto:konstantin.knauf@tngtech.com>>:
>
> Hi Fabian,
>
> thanks for your answer. Yes, that's what I want.
>
> The solution you suggest is what I am doing right now (see last
> of the
> bullet point in my question).
>
> But given your example. I would expect the following output:
>
> (key: 1, wtime: 10, agg: 17)
> (key: 2, wtime: 10, agg: 20)
> (key: 1, wtime: 20, agg: 30)
> (key: 1, wtime: 20, agg: 30)
> (key: 1, wtime: 20, agg: 30)
>
> Because the reduce function is evaluated for every incoming
> event (i.e.
> each key), right?
>
> Cheers,
>
> Konstantin
>
> On 23.11.2015 10:47, Fabian Hueske wrote:
> > Hi Konstantin,
> >
> > let me first summarize to make sure I understood what you are looking for.
> > You computed an aggregate over a keyed eventtime window and you are
> > looking for the maximum aggregate for each group of windows over the
> > same period of time.
> > So if you have
> > (key: 1, wtime: 10, agg: 17)
> > (key: 2, wtime: 10, agg: 20)
> > (key: 1, wtime: 20, agg: 30)
> > (key: 2, wtime: 20, agg: 28)
> > (key: 3, wtime: 20, agg: 5)
> >
> > you would like to get:
> > (key: 2, wtime: 10, agg: 20)
> > (key: 1, wtime: 20, agg: 30)
> >
> > If this is correct, you can do this as follows.
> > You can extract the window start and end time from the TimeWindow
> > parameter of the WindowFunction and key the stream either by start or
> > end time and apply a ReduceFunction on the keyed stream.
> >
> > Best, Fabian
> >
> > 20151123 8:41 GMT+01:00 Konstantin Knauf <konstantin.knauf@tngtech.com
<mailto:konstantin.knauf@tngtech.com>
> > <mailto:konstantin.knauf@tngtech.com
> <mailto:konstantin.knauf@tngtech.com>>>:
> >
> > Hi everyone,
> >
> > me again :) Let's say you have a stream, and for every
> window and key
> > you compute some aggregate value, like this:
> >
> > DataStream.keyBy(..)
> > .timeWindow(..)
> > .apply(...)
> >
> >
> > Now I want to get the maximum aggregate value for every
> window over the
> > keys. This feels like a pretty natural use case. How can I
> achieve this
> > with Flink in the most compact way?
> >
> > The options I thought of so far are:
> >
> > * Use an allTimeWindow, obviously. Drawback is, that the
> WindowFunction
> > would not be distributed by keys anymore.
> >
> > * use a windowAll after the WindowFunction to create
> windows of the
> > aggregates, which originated from the same timeWindow.
> This could be
> > done either with a TimeWindow or with a GlobalWindow with
> DeltaTrigger.
> > Drawback: Seems unnecessarily complicated and doubles the
> latency (at
> > least in my naive implementation ;)).
> >
> > * Of course, you could also just keyBy the start time of
> the window
> > after the WindowFunction, but then you get more than one
> event for each
> > window.
> >
> > Is there some easy way I am missing? If not, is there a
> technical
> > reasons, why such an "reduceByKeyAndWindow"operator is
> not available in
> > Flink?
> >
> > Cheers,
> >
> > Konstantin
> >
> >
>
> 
> Konstantin Knauf * konstantin.knauf@tngtech.com
> <mailto:konstantin.knauf@tngtech.com> * +491743413182
> <tel:%2B491743413182>
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>

Konstantin Knauf * konstantin.knauf@tngtech.com * +491743413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
