flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Re: Periodic full stream aggregations
Date Tue, 21 Apr 2015 21:18:26 GMT
Hey,

The current code supports 2 types of aggregations, simple binary reduce:
T,T=>T and also the grouped version for this, where the reduce function is
applied per a user defined key (so there we keep a map of reduced values).
This can already be used to implement fairly complex logic if we transform
the data to a proper type before passing it to the reducer.

As a next step we can make this work with fold + combiners as well, where
your initial data type is T and your fould function is T,R => R and a
combiner is R,R => R.

At that point I think any sensible aggregation can be implemented.

Regards,
Gyula


On Tue, Apr 21, 2015 at 10:50 PM, Bruno Cadonna <
cadonna@informatik.hu-berlin.de> wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
> Hi Gyula,
>
> fair enough!
>
> I used a bad example.
>
> What I really wanted to know is whether your code supports only
> aggregation like sum, min, and max where you need to pass only a value
> to the next aggregation or also more complex data structures, e.g., a
> synopsis of the full stream, to compute an aggregation such as an
> approximate count distinct (item count)?
>
> Cheers,
> Bruno
>
> On 21.04.2015 15:18, Gyula Fóra wrote:
> > You are right, but you should never try to compute full stream
> > median, thats the point :D
> >
> > On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna <
> > cadonna@informatik.hu-berlin.de> wrote:
> >
> > Hi Gyula,
> >
> > I read your comments of your PR.
> >
> > I have a question to this comment:
> >
> > "It only allows aggregations so we dont need to keep the full
> > history in a buffer."
> >
> > What if the user implements an aggregation function like a median?
> >
> > For a median you need the full history, don't you?
> >
> > Am I missing something?
> >
> > Cheers, Bruno
> >
> > On 21.04.2015 14:31, Gyula Fóra wrote:
> >>>> I have opened a PR for this feature:
> >>>>
> >>>> https://github.com/apache/flink/pull/614
> >>>>
> >>>> Cheers, Gyula
> >>>>
> >>>> On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra
> >>>> <gyula.fora@gmail.com> wrote:
> >>>>
> >>>>> Thats a good idea, I will modify my PR to that :)
> >>>>>
> >>>>> Gyula
> >>>>>
> >>>>> On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske
> >>>>> <fhueske@gmail.com> wrote:
> >>>>>
> >>>>>> Is it possible to switch the order of the statements,
> >>>>>> i.e.,
> >>>>>>
> >>>>>> dataStream.every(Time.of(4,sec)).reduce(...) instead of
> >>>>>> dataStream.reduce(...).every(Time.of(4,sec))
> >>>>>>
> >>>>>> I think that would be more consistent with the structure
> >>>>>> of the remaining API.
> >>>>>>
> >>>>>> Cheers, Fabian
> >>>>>>
> >>>>>> 2015-04-21 10:57 GMT+02:00 Gyula Fóra
> >>>>>> <gyfora@apache.org>:
> >>>>>>
> >>>>>>> Hi Bruno,
> >>>>>>>
> >>>>>>> Of course you can do that as well. (That's the good
> >>>>>>> part :p )
> >>>>>>>
> >>>>>>> I will open a PR soon with the proposed changes (first
> >>>>>>> without breaking
> >>>>>> the
> >>>>>>> current Api) and I will post it here.
> >>>>>>>
> >>>>>>> Cheers, Gyula
> >>>>>>>
> >>>>>>> On Tuesday, April 21, 2015, Bruno Cadonna <
> >>>>>> cadonna@informatik.hu-berlin.de
> >>>>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>> Hi Gyula,
> >>>>
> >>>> I have a question regarding your suggestion.
> >>>>
> >>>> Can the current continuous aggregation be also specified with
> >>>> your proposed periodic aggregation?
> >>>>
> >>>> I am thinking about something like
> >>>>
> >>>> dataStream.reduce(...).every(Count.of(1))
> >>>>
> >>>> Cheers, Bruno
> >>>>
> >>>> On 20.04.2015 22:32, Gyula Fóra wrote:
> >>>>>>>>>> Hey all,
> >>>>>>>>>>
> >>>>>>>>>> I think we are missing a quite useful feature
> >>>>>>>>>> that could be implemented (with some slight
> >>>>>>>>>> modifications) on top of the current windowing
> >>>>>>>>>> api.
> >>>>>>>>>>
> >>>>>>>>>> We currently provide 2 ways of aggregating (or
> >>>>>>>>>> reducing) over streams: doing a continuous
> >>>>>>>>>> aggregation and always output the aggregated
> >>>>>>>>>> value (which cannot be done properly in parallel)
> >>>>>>>>>> or doing aggregation in a window periodically.
> >>>>>>>>>>
> >>>>>>>>>> What we don't have at the moment is periodic
> >>>>>>>>>> aggregations on the whole stream. I would even
go
> >>>>>>>>>> as far as to remove the continuous outputting
> >>>>>>>>>> reduce/aggregate it and replace it with this
> >>>>>>>>>> version as this in return can be done properly
in
> >>>>>>>>>> parallel.
> >>>>>>>>>>
> >>>>>>>>>> My suggestion would be that a call:
> >>>>>>>>>>
> >>>>>>>>>> dataStream.reduce(..) dataStream.sum(..)
> >>>>>>>>>>
> >>>>>>>>>> would return a windowed data stream where the
> >>>>>>>>>> window is the whole record history, and the
user
> >>>>>>>>>> would need to define a trigger to get the actual
> >>>>>>>>>> reduced values like:
> >>>>>>>>>>
> >>>>>>>>>> dataStream.reduce(...).every(Time.of(4,sec))
to
> >>>>>>>>>> get the actual reduced results.
> >>>>>>>>>> dataStream.sum(...).every(...)
> >>>>>>>>>>
> >>>>>>>>>> I think the current data stream
> >>>>>>>>>> reduce/aggregation is very confusing without
> >>>>>>>>>> being practical for any normal use-case.
> >>>>>>>>>>
> >>>>>>>>>> Also this would be a very api breaking change
> >>>>>>>>>> (but I would still make this change as it is
much
> >>>>>>>>>> more intuitive than the current behaviour) so
I
> >>>>>>>>>> would try to push it before the release if we
can
> >>>>>>>>>> agree.
> >>>>>>>>>>
> >>>>>>>>>> Cheers, Gyula
> >>>>>>>>>>
> >>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >
> >>
> >
>
> - --
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>
>   Dr. Bruno Cadonna
>   Postdoctoral Researcher
>
>   Databases and Information Systems
>   Department of Computer Science
>   Humboldt-Universität zu Berlin
>
>   http://www.informatik.hu-berlin.de/~cadonnab
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> -----BEGIN PGP SIGNATURE-----
> Version: GnuPG v1
>
> iQEcBAEBAgAGBQJVNrgwAAoJEKdCIJx7flKwBbUIALXaXY3WuQw5ZG/TPrUZLl7d
> jLI0syhM62rv8larlpC6xGLxIHDDLfABSD/F+amXE6afmYqM4cb2R9tsjWuRzKt8
> IWJoqT17EetTw82brOfy+kLCdm+URbPa1IzbuGeg02/zx/DmWXavnBilwSr679mC
> kbaGPgQ/6mVN6p4GL873CXhep4R89YQVmIG+9pQaesvh//lqTkV/8eXjP2jKN4Oq
> gYnWIwScJ9QfsyRj3jRs7lVLXeIq5ID94UkLryZnn5dEIRnoxfq6bHR0pVUbQJgp
> jwZRtT5CX83U3KUvstZ0z6M6ButbCWq8ol2Gf6ZOVpZfzj68Fz1PtbZyJTFhpDU=
> =bhGt
> -----END PGP SIGNATURE-----
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message