flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nico Kruber <n...@data-artisans.com>
Subject Re: Aggregation by key hierarchy
Date Mon, 21 Aug 2017 14:31:08 GMT
Hi Basant,
no, you cannot add data streams or re-wire your program during runtime.
As for any other program changes, you'd have to take a savepoint (to keep 
operator state and exactly-once semantics) and restart the new program code 
from there.

For a few combinations, I'd probably choose the second option for simplicity 
but for more combinations, option 1 seems better (mapping your key 
combinations to different tuple-keys, key-by this one and applying window 
operations afterwards).

Option 2 may also require more slots to be available since it has more 
operators [1] and may not be evenly balanced based on your input data and the 
work associated with it. Since option 1's window operators aggregate all 
different tuples, load distribution may be better. Other than that, the 
communication pattern is similar. To get a better understanding of the 
performance impacts, you'd have to benchmark with your aggregation and input 
data though.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/
programming-model.html#parallel-dataflows

On Wednesday, 16 August 2017 13:24:42 CEST Basanth Gowda wrote:
> Thanks Nico.
> 
> As there are 2 ways to achieve this which is better ?
> 
> 1st option -> dataStream.flatMap( ... ) -> this takes in out and provides
> me N number of outputs, depending on my key combination . On each of the
> output the same windowing logic is applied
> 
> or the one you suggested
> 
> 2nd option -> use keyBy to create N number of streams
> 
> With the fist option I would use an external config, and it allows me to
> change the number of combinations dynamically at runtime. Would it be
> possible with 2nd option as well ? Can I modify or add data stream at
> runtime without restarting  ?
> 
> On Wed, Aug 16, 2017 at 4:37 AM, Nico Kruber <nico@data-artisans.com> wrote:
> > [back to the ml...]
> > 
> > also including your other mail's additional content...
> > 
> > > I have been able to do this by the following and repeating this for
> > > every
> > > key + window combination. So in the above case there would be 8 blocks
> > 
> > like
> > 
> > > below. (4 combinations and 2 window period for each combination)
> > > 
> > > modelDataStream.keyBy("campaiginId","addId")
> > > 
> > >         .timeWindow(Time.minutes(1))
> > >         .trigger(CountTrigger.of(2))
> > >         .reduce(..)
> > 
> > As mentioned in my last email, I only see one way for reducing duplication
> > (for the key combinations) but this involves more handling from your side
> > and
> > I'd probably not recommend this. Regarding the different windows, I do not
> > see
> > something you may do otherwise here.
> > 
> > Maybe Aljoscha (cc'd) has an idea of how to do this better
> > 
> > 
> > Nico
> > 
> > On Monday, 14 August 2017 19:08:29 CEST Basanth Gowda wrote:
> > > Hi Nico,
> > > Thank you . This is pretty much what I am doing , was wondering if there
> > 
> > is
> > 
> > > a better way.
> > > 
> > > If there are 10 dimensions on which I want to aggregate with 2 windows -
> > > this would become about 20 different combinations
> > > 
> > > Thank you
> > > Basanth
> > > 
> > > On Mon, Aug 14, 2017 at 12:50 PM Nico Kruber <nico@data-artisans.com>
> > 
> > wrote:
> > > > Hi Basanth,
> > > > Let's assume you have records of the form
> > > > Record = {timestamp, country, state, city, value}
> > > > Then you'd like to create aggregates, e.g. the average, for the
> > 
> > following
> > 
> > > > combinations?
> > > > 1) avg per country
> > > > 2) avg per state and country
> > > > 3) avg per city and state and country
> > > > 
> > > > * You could create three streams and aggregate each individually:
> > > > DataStream<Record> ds = //...
> > > > DataStream<Record> ds1 = ds.keyBy("country");
> > > > DataStream<Record> ds2 = ds.keyBy("country","state");
> > > > DataStream<Record> ds3 = ds.keyBy("country","state","city");
> > > > // + your aggregation per stream ds1, ds2, ds3
> > > > 
> > > > You probably want to do different things for each of the resulting
> > > > aggregations anyway, so having separate streams is probably right for
> > 
> > you.
> > 
> > > > * Alternatively, you could go with ds1 only and create the aggregates
> > 
> > of
> > 
> > > > the
> > > > per-state (2) and per-city (3) ones in a stateful aggregation function
> > > > yourself, e.g. in a MapState [1]. At the end of your aggregation
> > 
> > window,
> > 
> > > > you
> > > > could then emit those with different keys to be able to distinguish
> > > > between
> > > > them.
> > > > 
> > > > 
> > > > Nico
> > > > 
> > > > [1]
> > > > https://ci.apache.org/projects/flink/flink-docs-> > 
> > release-1.3/dev/stream/
> > 
> > > > state.html
> > > > <https://ci.apache.org/projects/flink/flink-docs-> > 
> > release-1.3/dev/stream/st
> > 
> > > > ate.html>>
> > > > 
> > > > On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote:
> > > > > For example - this is a sample model from one of the Apache Apex
> > > > > presentation.
> > > > > 
> > > > > I would want to aggregate for different combinations, and different
> > 
> > time
> > 
> > > > > buckets. What is the best way to do this in Flink ?
> > > > > 
> > > > > {"keys":[{"name":"campaignId","type":"integer"},
> > > > > 
> > > > >  {"name":"adId","type":"integer"},
> > > > >  {"name":"creativeId","type":"integer"},
> > > > >  {"name":"publisherId","type":"integer"},
> > > > >  {"name":"adOrderId","type":"integer"}],
> > > > >  "timeBuckets":["1h","1d"],
> > > > > 
> > > > >  "values":
> > > > > [{"name":"impressions","type":"integer","aggregators":["SUM"]}
> > > > > ,
> > > > > 
> > > > >  {"name":"clicks","type":"integer","aggregators":["SUM"]},
> > > > >  {"name":"revenue","type":"integer"}],
> > > > >  "dimensions":
> > > > >  [{"combination":["campaignId","adId"]},
> > > > >  {"combination":["creativeId","campaignId"]},
> > > > >  {"combination":["campaignId"]},
> > > > >  {"combination":["publisherId","adOrderId","campaignId"],
> > > > > 
> > > > > "additionalValues":["revenue:SUM"]}]
> > > > > }
> > > > > 
> > > > > 
> > > > > thank you,
> > > > > B
> > > > > 
> > > > > On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <
> > 
> > basanth.gowda@gmail.com>
> > 
> > > > > wrote:
> > > > > > Hi,
> > > > > > I want to aggregate hits by Country, State, City. I would these
as
> > > > 
> > > > tags in
> > > > 
> > > > > > my sample data.
> > > > > > 
> > > > > > How would I do aggregation at different levels ? Input data
would
> > 
> > be
> > 
> > > > > > single record
> > > > > > 
> > > > > > Should I do flatMap transformation first and create 3 records
from
> > 
> > 1
> > 
> > > > input
> > > > 
> > > > > > record, or is there a better way to do it ?
> > > > > > 
> > > > > > thank you,
> > > > > > basanth


Mime
View raw message