flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Basanth Gowda <basanth.go...@gmail.com>
Subject Re: Aggregation by key hierarchy
Date Wed, 16 Aug 2017 11:24:42 GMT
Thanks Nico.

As there are 2 ways to achieve this which is better ?

1st option -> dataStream.flatMap( ... ) -> this takes in out and provides
me N number of outputs, depending on my key combination . On each of the
output the same windowing logic is applied

or the one you suggested

2nd option -> use keyBy to create N number of streams

With the fist option I would use an external config, and it allows me to
change the number of combinations dynamically at runtime. Would it be
possible with 2nd option as well ? Can I modify or add data stream at
runtime without restarting  ?

On Wed, Aug 16, 2017 at 4:37 AM, Nico Kruber <nico@data-artisans.com> wrote:

> [back to the ml...]
>
> also including your other mail's additional content...
> > I have been able to do this by the following and repeating this for every
> > key + window combination. So in the above case there would be 8 blocks
> like
> > below. (4 combinations and 2 window period for each combination)
> >
> > modelDataStream.keyBy("campaiginId","addId")
> >         .timeWindow(Time.minutes(1))
> >         .trigger(CountTrigger.of(2))
> >         .reduce(..)
>
> As mentioned in my last email, I only see one way for reducing duplication
> (for the key combinations) but this involves more handling from your side
> and
> I'd probably not recommend this. Regarding the different windows, I do not
> see
> something you may do otherwise here.
>
> Maybe Aljoscha (cc'd) has an idea of how to do this better
>
>
> Nico
>
> On Monday, 14 August 2017 19:08:29 CEST Basanth Gowda wrote:
> > Hi Nico,
> > Thank you . This is pretty much what I am doing , was wondering if there
> is
> > a better way.
> >
> > If there are 10 dimensions on which I want to aggregate with 2 windows -
> > this would become about 20 different combinations
> >
> > Thank you
> > Basanth
> >
> > On Mon, Aug 14, 2017 at 12:50 PM Nico Kruber <nico@data-artisans.com>
> wrote:
> > > Hi Basanth,
> > > Let's assume you have records of the form
> > > Record = {timestamp, country, state, city, value}
> > > Then you'd like to create aggregates, e.g. the average, for the
> following
> > > combinations?
> > > 1) avg per country
> > > 2) avg per state and country
> > > 3) avg per city and state and country
> > >
> > > * You could create three streams and aggregate each individually:
> > > DataStream<Record> ds = //...
> > > DataStream<Record> ds1 = ds.keyBy("country");
> > > DataStream<Record> ds2 = ds.keyBy("country","state");
> > > DataStream<Record> ds3 = ds.keyBy("country","state","city");
> > > // + your aggregation per stream ds1, ds2, ds3
> > >
> > > You probably want to do different things for each of the resulting
> > > aggregations anyway, so having separate streams is probably right for
> you.
> > >
> > > * Alternatively, you could go with ds1 only and create the aggregates
> of
> > > the
> > > per-state (2) and per-city (3) ones in a stateful aggregation function
> > > yourself, e.g. in a MapState [1]. At the end of your aggregation
> window,
> > > you
> > > could then emit those with different keys to be able to distinguish
> > > between
> > > them.
> > >
> > >
> > > Nico
> > >
> > > [1]
> > > https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/
> > > state.html
> > > <https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/st
> > > ate.html>>
> > > On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote:
> > > > For example - this is a sample model from one of the Apache Apex
> > > > presentation.
> > > >
> > > > I would want to aggregate for different combinations, and different
> time
> > > > buckets. What is the best way to do this in Flink ?
> > > >
> > > > {"keys":[{"name":"campaignId","type":"integer"},
> > > >
> > > >  {"name":"adId","type":"integer"},
> > > >  {"name":"creativeId","type":"integer"},
> > > >  {"name":"publisherId","type":"integer"},
> > > >  {"name":"adOrderId","type":"integer"}],
> > > >  "timeBuckets":["1h","1d"],
> > > >
> > > >  "values":
> > > > [{"name":"impressions","type":"integer","aggregators":["SUM"]}
> > > > ,
> > > >
> > > >  {"name":"clicks","type":"integer","aggregators":["SUM"]},
> > > >  {"name":"revenue","type":"integer"}],
> > > >  "dimensions":
> > > >  [{"combination":["campaignId","adId"]},
> > > >  {"combination":["creativeId","campaignId"]},
> > > >  {"combination":["campaignId"]},
> > > >  {"combination":["publisherId","adOrderId","campaignId"],
> > > >
> > > > "additionalValues":["revenue:SUM"]}]
> > > > }
> > > >
> > > >
> > > > thank you,
> > > > B
> > > >
> > > > On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <
> basanth.gowda@gmail.com>
> > > >
> > > > wrote:
> > > > > Hi,
> > > > > I want to aggregate hits by Country, State, City. I would these as
> > >
> > > tags in
> > >
> > > > > my sample data.
> > > > >
> > > > > How would I do aggregation at different levels ? Input data would
> be
> > > > > single record
> > > > >
> > > > > Should I do flatMap transformation first and create 3 records from
> 1
> > >
> > > input
> > >
> > > > > record, or is there a better way to do it ?
> > > > >
> > > > > thank you,
> > > > > basanth
>
>

Mime
View raw message