flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephen Connolly <stephen.alan.conno...@gmail.com>
Subject Re: Reduce one event under multiple keys
Date Sun, 10 Feb 2019 19:36:33 GMT
On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler <chesnay@apache.org> wrote:

> This sounds reasonable to me.
>
> I'm a bit confused by this question: "*Additionally, I am (naïevely)
> hoping that if a window has no events for a particular key, the
> memory/storage costs are zero for that key.*"
>
> Are you asking whether a key that was received in window X (as part of an
> event) is still present in window x+1? If so, then the answer is no; a key
> will only be present in a given window if an event was received that fits
> into that window.
>

To confirm:

So let's say I'l tracking the average time a file is opened in folders.

In window N we get the events:

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}

So there will be aggregates stored for
("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
("ca:fe:ba:be","/foo/bar/README.txt"), etc

In window N+1 we do not get any events at all.

So the memory used by my aggregation functions from window N will be freed
and the storage will be effectively zero (modulo any follow on processing
that might be on a longer window)

This seems to be what you are saying... in which case my naïeve hope was
not so naïve! w00t!


>
> On 08.02.2019 13:21, Stephen Connolly wrote:
>
> Ok, I'll try and map my problem into something that should be familiar to
> most people.
>
> Consider collection of PCs, each of which has a unique ID, e.g.
> ca:fe:ba:be, de:ad:be:ef, etc.
>
> Each PC has a tree of local files. Some of the file paths are
> coincidentally the same names, but there is no file sharing between PCs.
>
> I need to produce metrics about how often files are opened and how long
> they are open for.
>
> I need for every X minute tumbling window not just the cumulative averages
> for each PC, but the averages for each file as well as the cumulative
> averegaes for each folder and their sub-folders.
>
> I have a stream of events like
>
>
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin
> guide.txt","duration":"196"}
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
> {"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}
>
> So from that I would like to know stuff like:
>
> ca:fe:ba:be had 4/X opens per minute in the X minute window
> ca:fe:ba:be had 3/X closes per minute in the X minute window and the
> average time open was (67+97+197)/3=120... there is no guarantee that the
> closes will be matched with opens in the same window, which is why I'm only
> tracking them separately
> de:ad:be:ef had 2/X opens per minute in the X minute window
> ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
> ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the
> average time open was 120
> de:ad:be:ef /foo had 1/X opens per minute in the X minute window
> de:ad:be:ef /bar had 1/X opens per minute in the X minute window
> de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
> de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
> de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X
> minute window
> de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute
> window
> etc
>
> What I think I want to do is turn each event into a series of events with
> different keys, so that
>
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>
> gets sent under the keys:
>
> ("ca:fe:ba:be","/")
> ("ca:fe:ba:be","/foo")
> ("ca:fe:ba:be","/foo/bar")
> ("ca:fe:ba:be","/foo/bar/README.txt")
>
> Then I could use a window aggregation function to just:
>
> * count the "open" events
> * count the "close" events and sum their duration
>
> Additionally, I am (naïevely) hoping that if a window has no events for a
> particular key, the memory/storage costs are zero for that key.
>
> From what I can see, to achieve what I am trying to do, I could use a
> flatMap followed by a keyBy
>
> In other words I take the events and flat map them based on the path split
> on '/' returning a Tuple of the (to be) key and the event. Then I can use
> keyBy to key based on the Tuple 0.
>
> My ask:
>
> Is the above design a good design? How would you achieve the end game
> better? Do I need to worry about many paths that are accessed rarely and
> would have an accumulator function that stays at 0 unless there are events
> in that window... or are the accumulators for each distinct key eagerly
> purged after each fire trigger.
>
> What gotcha's do I need to look for.
>
> Thanks in advance and appologies for the length
>
> -stephenc
>
>
>

Mime
View raw message