flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Suneel Marthi (JIRA)" <j...@apache.org>
Subject [jira] [Assigned] (FLINK-1272) Add a "reduceWithKey" function
Date Tue, 25 Nov 2014 16:38:12 GMT

     [ https://issues.apache.org/jira/browse/FLINK-1272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Suneel Marthi reassigned FLINK-1272:
------------------------------------

    Assignee: Suneel Marthi

> Add a "reduceWithKey" function
> ------------------------------
>
>                 Key: FLINK-1272
>                 URL: https://issues.apache.org/jira/browse/FLINK-1272
>             Project: Flink
>          Issue Type: New Feature
>          Components: Java API, Scala API
>            Reporter: Stephan Ewen
>            Assignee: Suneel Marthi
>
> Flink does not assume a key/value model for grouping/aggregating/joining. The keys are
specified as positions or paths of the objects to be grouped/joined.
> Currently, we do not expose the key in the {{ReduceFunction}} and {{GroupReduceFunction}},
bit give (iterators over) the objects themselves.
> Since it is a common case to access the key, I suggest to add a convenience function
{{GroupReduceWithKey}} that has the following signature and can be called as follows:
> {code}
> public interface GroupReduceWithKeyFunction<KEY, IN, OUT> {
>     void reduceGroup(KEY key, Iterable<IN> value, Collector<OUT> out);
> }
> {code}
> Scala:
> {code}
> val  data : DataSet[SomePOJO] = ...
> data
>   .groupBy("id")
>   .reduceGroup( (key, value, out : Collector[(String, Long)]) =>
>                             out.collect( (key, values.minBy(_.timestamp) ) );
> {code}
> Java:
> {code}
> DataSet<SomePOJO> data = ...
> data
>   .groupBy("id")
>   .reduceGroup(
>       new GroupReduceWithKeyFunction<String, SomePOJO, Tuple2<String, Long>>
{
>           ...
>       }
> {code}
> The sae 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message