flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5582) Add a general distributive aggregate function
Date Mon, 23 Jan 2017 02:51:26 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833830#comment-15833830
] 

ASF GitHub Bot commented on FLINK-5582:
---------------------------------------

Github user shaoxuan-wang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3186#discussion_r97246135
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
---
    @@ -0,0 +1,94 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.functions;
    +
    +import java.io.Serializable;
    +
    +/**
    + * 
    + * <p>Aggregation functions must be {@link Serializable} because they are sent
around
    + * between distributed processes during distributed execution.
    + * 
    + * <p>An example how to use this interface is below:
    + * 
    + * <pre>{@code
    + * // the accumulator, which holds the state of the in-flight aggregate
    + * public class AverageAccumulator {
    + *     long count;
    + *     long sum;
    + * }
    + * 
    + * // implementation of an aggregation function for an 'average'
    + * public class Average implements AggregateFunction<Integer, AverageAccumulator,
Double> {
    + * 
    + *     public AverageAccumulator createAccumulator() {
    + *         return new AverageAccumulator();
    + *     }
    + * 
    + *     public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
    + *         a.count += b.count;
    + *         a.sum += b.sum;
    + *         return a;
    + *     }
    + * 
    + *     public void add(Integer value, AverageAccumulator acc) {
    + *         acc.sum += value;
    + *         acc.count++;
    + *     }
    + * 
    + *     public Double getResult(AverageAccumulator acc) {
    + *         return acc.sum / (double) acc.count;
    + *     }
    + * }
    + * 
    + * // implementation of a weighted average
    + * // this reuses the same accumulator type as the aggregate function for 'average'
    + * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator,
Double> {
    + *
    + *     public AverageAccumulator createAccumulator() {
    + *         return new AverageAccumulator();
    + *     }
    + *
    + *     public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
    + *         a.count += b.count;
    + *         a.sum += b.sum;
    + *         return a;
    + *     }
    + *
    + *     public void add(Datum value, AverageAccumulator acc) {
    + *         acc.count += value.getWeight();
    + *         acc.sum += value.getValue();
    + *     }
    + *
    + *     public Double getResult(AverageAccumulator acc) {
    + *         return acc.sum / (double) acc.count;
    + *     }
    + * }
    + * }</pre>
    + */
    +public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
{
    +
    +	ACC createAccumulator();
    +
    +	void add(IN value, ACC accumulator);
    --- End diff --
    
    TableAPI UDAGG will be eventually translated to this windowStream API. The accumulate
and retract will be handled in this add function. I think it is OK if we "view retractions
as adding negative values".


> Add a general distributive aggregate function
> ---------------------------------------------
>
>                 Key: FLINK-5582
>                 URL: https://issues.apache.org/jira/browse/FLINK-5582
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be used on windows
and in state, both of which have limitations:
>   - {{ReduceFunction}} only supports one type as the type that is added and aggregated/returned.
>   - {{FoldFunction}} Supports different types to add and return, but is not distributive,
i.e. it cannot be used for hierarchical aggregation, for example to split the aggregation
into to pre- and final-aggregation.
> I suggest to add a generic and powerful aggregation function that supports:
>   - Different types to add, accumulate, and return
>   - The ability to merge partial aggregated by merging the accumulated type.
> The proposed interface is below. This type of interface is found in many APIs, like that
of various databases, and also in Apache Beam:
>   - The accumulator is the state of the running aggregate
>   - Accumulators can be merged
>   - Values are added to the accumulator
>   - Getting the result from the accumulator perform an optional finalizing operation
> {code}
> public interface AggregateFunction<IN, ACC, OUT> extends Function {
> 	ACC createAccumulator();
> 	void add(IN value, ACC accumulator);
> 	OUT getResult(ACC accumulator);
> 	ACC merge(ACC a, ACC b);
> }
> {code}
> Example use:
> {code}
> public class AverageAccumulator {
>     long count;
>     long sum;
> }
> // implementation of a simple average
> public class Average implements AggregateFunction<Integer, AverageAccumulator, Double>
{
>     public AverageAccumulator createAccumulator() {
>         return new AverageAccumulator();
>     }
>     public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
>         a.count += b.count;
>         a.sum += b.sum;
>         return a;
>     }
>     public void add(Integer value, AverageAccumulator acc) {
>         acc.sum += value;
>         acc.count++;
>     }
>     public Double getResult(AverageAccumulator acc) {
>         return acc.sum / (double) acc.count;
>     }
> }
> // implementation of a weighted average
> // this reuses the same accumulator type as the aggregate function for 'average'
> public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator,
Double> {
>     public AverageAccumulator createAccumulator() {
>         return new AverageAccumulator();
>     }
>     public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
>         a.count += b.count;
>         a.sum += b.sum;
>         return a;
>     }
>     public void add(Datum value, AverageAccumulator acc) {
>         acc.count += value.getWeight();
>         acc.sum += value.getValue();
>     }
>     public Double getResult(AverageAccumulator acc) {
>         return acc.sum / (double) acc.count;
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message