flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nico Kruber <n...@data-artisans.com>
Subject Re: Aggregation by key hierarchy
Date Mon, 14 Aug 2017 16:49:54 GMT
Hi Basanth,
Let's assume you have records of the form
Record = {timestamp, country, state, city, value}
Then you'd like to create aggregates, e.g. the average, for the following 
combinations?
1) avg per country
2) avg per state and country
3) avg per city and state and country

* You could create three streams and aggregate each individually:
DataStream<Record> ds = //...
DataStream<Record> ds1 = ds.keyBy("country");
DataStream<Record> ds2 = ds.keyBy("country","state");
DataStream<Record> ds3 = ds.keyBy("country","state","city");
// + your aggregation per stream ds1, ds2, ds3

You probably want to do different things for each of the resulting 
aggregations anyway, so having separate streams is probably right for you.

* Alternatively, you could go with ds1 only and create the aggregates of the 
per-state (2) and per-city (3) ones in a stateful aggregation function 
yourself, e.g. in a MapState [1]. At the end of your aggregation window, you 
could then emit those with different keys to be able to distinguish between 
them.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html

On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote:
> For example - this is a sample model from one of the Apache Apex
> presentation.
> 
> I would want to aggregate for different combinations, and different time
> buckets. What is the best way to do this in Flink ?
> 
> {"keys":[{"name":"campaignId","type":"integer"},
>  {"name":"adId","type":"integer"},
>  {"name":"creativeId","type":"integer"},
>  {"name":"publisherId","type":"integer"},
>  {"name":"adOrderId","type":"integer"}],
>  "timeBuckets":["1h","1d"],
>  "values":
> [{"name":"impressions","type":"integer","aggregators":["SUM"]}
> ,
>  {"name":"clicks","type":"integer","aggregators":["SUM"]},
>  {"name":"revenue","type":"integer"}],
>  "dimensions":
>  [{"combination":["campaignId","adId"]},
>  {"combination":["creativeId","campaignId"]},
>  {"combination":["campaignId"]},
>  {"combination":["publisherId","adOrderId","campaignId"],
> "additionalValues":["revenue:SUM"]}]
> }
> 
> 
> thank you,
> B
> 
> On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <basanth.gowda@gmail.com>
> 
> wrote:
> > Hi,
> > I want to aggregate hits by Country, State, City. I would these as tags in
> > my sample data.
> > 
> > How would I do aggregation at different levels ? Input data would be
> > single record
> > 
> > Should I do flatMap transformation first and create 3 records from 1 input
> > record, or is there a better way to do it ?
> > 
> > thank you,
> > basanth


Mime
View raw message