flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nico Kruber <n...@data-artisans.com>
Subject Re: Aggregation by key hierarchy
Date Wed, 16 Aug 2017 08:37:13 GMT
[back to the ml...]

also including your other mail's additional content...
> I have been able to do this by the following and repeating this for every
> key + window combination. So in the above case there would be 8 blocks like
> below. (4 combinations and 2 window period for each combination)
> 
> modelDataStream.keyBy("campaiginId","addId")
>         .timeWindow(Time.minutes(1))
>         .trigger(CountTrigger.of(2))
>         .reduce(..)

As mentioned in my last email, I only see one way for reducing duplication 
(for the key combinations) but this involves more handling from your side and 
I'd probably not recommend this. Regarding the different windows, I do not see 
something you may do otherwise here.

Maybe Aljoscha (cc'd) has an idea of how to do this better


Nico

On Monday, 14 August 2017 19:08:29 CEST Basanth Gowda wrote:
> Hi Nico,
> Thank you . This is pretty much what I am doing , was wondering if there is
> a better way.
> 
> If there are 10 dimensions on which I want to aggregate with 2 windows -
> this would become about 20 different combinations
> 
> Thank you
> Basanth
> 
> On Mon, Aug 14, 2017 at 12:50 PM Nico Kruber <nico@data-artisans.com> wrote:
> > Hi Basanth,
> > Let's assume you have records of the form
> > Record = {timestamp, country, state, city, value}
> > Then you'd like to create aggregates, e.g. the average, for the following
> > combinations?
> > 1) avg per country
> > 2) avg per state and country
> > 3) avg per city and state and country
> > 
> > * You could create three streams and aggregate each individually:
> > DataStream<Record> ds = //...
> > DataStream<Record> ds1 = ds.keyBy("country");
> > DataStream<Record> ds2 = ds.keyBy("country","state");
> > DataStream<Record> ds3 = ds.keyBy("country","state","city");
> > // + your aggregation per stream ds1, ds2, ds3
> > 
> > You probably want to do different things for each of the resulting
> > aggregations anyway, so having separate streams is probably right for you.
> > 
> > * Alternatively, you could go with ds1 only and create the aggregates of
> > the
> > per-state (2) and per-city (3) ones in a stateful aggregation function
> > yourself, e.g. in a MapState [1]. At the end of your aggregation window,
> > you
> > could then emit those with different keys to be able to distinguish
> > between
> > them.
> > 
> > 
> > Nico
> > 
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
> > state.html
> > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/st
> > ate.html>> 
> > On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote:
> > > For example - this is a sample model from one of the Apache Apex
> > > presentation.
> > > 
> > > I would want to aggregate for different combinations, and different time
> > > buckets. What is the best way to do this in Flink ?
> > > 
> > > {"keys":[{"name":"campaignId","type":"integer"},
> > > 
> > >  {"name":"adId","type":"integer"},
> > >  {"name":"creativeId","type":"integer"},
> > >  {"name":"publisherId","type":"integer"},
> > >  {"name":"adOrderId","type":"integer"}],
> > >  "timeBuckets":["1h","1d"],
> > > 
> > >  "values":
> > > [{"name":"impressions","type":"integer","aggregators":["SUM"]}
> > > ,
> > > 
> > >  {"name":"clicks","type":"integer","aggregators":["SUM"]},
> > >  {"name":"revenue","type":"integer"}],
> > >  "dimensions":
> > >  [{"combination":["campaignId","adId"]},
> > >  {"combination":["creativeId","campaignId"]},
> > >  {"combination":["campaignId"]},
> > >  {"combination":["publisherId","adOrderId","campaignId"],
> > > 
> > > "additionalValues":["revenue:SUM"]}]
> > > }
> > > 
> > > 
> > > thank you,
> > > B
> > > 
> > > On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <basanth.gowda@gmail.com>
> > > 
> > > wrote:
> > > > Hi,
> > > > I want to aggregate hits by Country, State, City. I would these as
> > 
> > tags in
> > 
> > > > my sample data.
> > > > 
> > > > How would I do aggregation at different levels ? Input data would be
> > > > single record
> > > > 
> > > > Should I do flatMap transformation first and create 3 records from 1
> > 
> > input
> > 
> > > > record, or is there a better way to do it ?
> > > > 
> > > > thank you,
> > > > basanth


Mime
View raw message