flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@apache.org>
Subject Re: Hi / Aggregation support
Date Fri, 31 Oct 2014 21:17:40 GMT
Hi Viktor,

welcome on the dev mailing list! :-)

I agree that Flink's aggregations should be improved in various aspects:
- support more aggregation functions. Currently only MIN, MAX, SUM are
supported. Adding COUNT and AVG would be nice!
- support for multiple aggregations per field
- support for aggregations on POJO DataSets

How about to always return Tuples as the result of an aggregation. For
example something like:

DataSet<Tuple2<String, Integer>> ds = ...
DataSet<Tuple4<Tuple2<String,Integer>,Integer, Integer, Long> result =
ds.groupBy(0).min(1).andMax(1).andCnt();

or

DataSet<Tuple2<String, Integer>> ds = ...
DataSet<Tuple4<String,Integer, Integer, Long> result =
ds.groupBy(0).key(0).andMin(1).andMax(1).andCnt();

In the first version, an arbitrary element of the group is added to the
result to identify the keys. The second example explicitly extracts the key
from the original input data. POJO data types can be handled similarly by
specifying the member fields to aggregate (or copy as key) by name.

Doing aggregations "in-place" within an input data type (and leaving other
fields untouched) could be a special variant of this operator.

2014-10-31 18:50 GMT+01:00 Rosenfeld, Viktor <viktor.rosenfeld@tu-berlin.de>
:

> Hi everybody,
>
> First, I want to introduce myself to the community. I am a PhD student who
> wants to work with and improve Flink.
>
> Second, I thought to work on improving aggregations as a start. My first
> goal is to simplify the computaton of a field average. Basically, I want to
> turn this plan:
>
>     val input = env.fromCollection( Array(1L, 2L, 3L, 4L) )
>
>     input
>     .map { in => (in, 1L) }
>     .sum(0).andSum(1)
>     .map { in => in._1.toDouble / in._2.toDouble }
>     .print
>
> into this:
>
>     // val input = ...
>     input.average(0).print()
>
> My basic idea is to internally still add the counter field and execute the
> map and sum steps but to hide them from the user.
>
> Next, I want to support multiple aggregations so one can write something
> like:
>
>     input.min(0).max(0).sum(0).count(0).average(0)
>
> Internally, there should only be one pass over the input data and average
> should reuse the work done by sum and count.
>
> In September there was some discussion [1] on the semantics of the min/max
> aggregations vs. minBy/maxBy. The consensus was that min/max should not
> simply return the respective field value but return the entire tuple.
> However, for count/sum/average there is no specific tuple and it would also
> not work for combinations of min/max.
>
> One possible route is to simply return a random element, similar to MySQL.
> I think this can be very surprising to the user especially when min/max are
> combined.
>
> Another possibility is to return the tuple only for single invocations of
> min or max and return the field value for the other aggregation functions
> or combinations. This is also inconstent but appears to be more inline with
> people's expectation. Also, there might be two or more tuples with the same
> min/max value and then the question is which should be returned.
>
> I haven't yet thought about aggregations in a streaming context and I
> would appreciate any input on this.
>
> Best,
> Viktor
>
> [1]
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Aggregations-td1706.html
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message