Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 47247114E6 for ; Wed, 24 Sep 2014 19:52:14 +0000 (UTC) Received: (qmail 32044 invoked by uid 500); 24 Sep 2014 19:52:14 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 32020 invoked by uid 500); 24 Sep 2014 19:52:14 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 32011 invoked by uid 99); 24 Sep 2014 19:52:14 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Sep 2014 19:52:14 +0000 X-ASF-Spam-Status: No, hits=-2000.8 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 24 Sep 2014 19:51:37 +0000 Received: (qmail 31372 invoked by uid 99); 24 Sep 2014 19:51:34 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Sep 2014 19:51:34 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9B3FC9A364D; Wed, 24 Sep 2014 19:51:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbalassi@apache.org To: commits@flink.incubator.apache.org Date: Wed, 24 Sep 2014 19:51:34 -0000 Message-Id: <0830c32020a9488cabe4ea045e720c5b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/12] git commit: [FLINK-1121] [streaming] minBy and maxBy operators added to streaming api X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-flink Updated Branches: refs/heads/master a3b02840d -> cb81319d9 [FLINK-1121] [streaming] minBy and maxBy operators added to streaming api Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/70464bb0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/70464bb0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/70464bb0 Branch: refs/heads/master Commit: 70464bb0a44f682c155fdfdd2a6b0a6cc1203663 Parents: 30ac9fe Author: Gyula Fora Authored: Wed Sep 24 16:57:36 2014 +0200 Committer: mbalassi Committed: Wed Sep 24 19:54:39 2014 +0200 ---------------------------------------------------------------------- docs/streaming_guide.md | 6 +- .../api/datastream/BatchedDataStream.java | 69 +++++++- .../streaming/api/datastream/DataStream.java | 66 +++++++ .../api/datastream/GroupedDataStream.java | 71 ++++++++ .../aggregation/MaxByAggregationFunction.java | 37 ++++ .../aggregation/MinByAggregationFunction.java | 55 ++++++ .../streaming/api/AggregationFunctionTest.java | 177 +++++++++++++++++-- 7 files changed, 460 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70464bb0/docs/streaming_guide.md ---------------------------------------------------------------------- diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md index 27a32ba..37ff90d 100644 --- a/docs/streaming_guide.md +++ b/docs/streaming_guide.md @@ -246,9 +246,11 @@ When the reduce operator is applied on a grouped data stream, the user-defined ` The Flink Streaming API supports different types of aggregation operators similarly to the core API. For grouped data streams the aggregations work in a grouped fashion. -Types of aggregations: `sum(fieldPosition)`, `min(fieldPosition)`, `max(fieldPosition)` +Types of aggregations: `sum(fieldPosition)`, `min(fieldPosition)`, `max(fieldPosition)`, `minBy(fieldPosition, first)`, `maxBy(fieldPosition, first)` -For every incoming tuple the selected field is replaced with the current aggregated value. If the aggregations are used without defining field position, position `0` is used as default. +With `sum`, `min`, and `max` for every incoming tuple the selected field is replaced with the current aggregated value. If the aggregations are used without defining field position, position `0` is used as default. + +With `minBy` and `maxBy` the output of the operator is the element with the current minimal or maximal value at the given fieldposition. If more components share the minimum or maximum value, the user can decide if the operator should return the first or last element. This can be set by the `first` boolean parameter. ### Window/Batch operators http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70464bb0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java index 51f1467..e8e3f31 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java @@ -24,7 +24,9 @@ import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.function.aggregation.AggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction; +import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction; +import org.apache.flink.streaming.api.function.aggregation.MinByAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable; @@ -118,8 +120,8 @@ public class BatchedDataStream { */ public SingleOutputStreamOperator reduceGroup(GroupReduceFunction reducer) { return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper(reducer, - GroupReduceFunction.class, 0), new FunctionTypeWrapper(reducer, GroupReduceFunction.class, - 1), getGroupReduceInvokable(reducer)); + GroupReduceFunction.class, 0), new FunctionTypeWrapper(reducer, + GroupReduceFunction.class, 1), getGroupReduceInvokable(reducer)); } /** @@ -160,6 +162,38 @@ public class BatchedDataStream { } /** + * Applies an aggregation that gives the minimum element of every sliding + * batch/window of the data stream by the given position. If more elements + * have the same minimum value the operator returns the first element by + * default. + * + * @param positionToMinBy + * The position in the data point to minimize + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator minBy(int positionToMinBy) { + return this.minBy(positionToMinBy, true); + } + + /** + * Applies an aggregation that gives the minimum element of every sliding + * batch/window of the data stream by the given position. If more elements + * have the same minimum value the operator returns either the first or last + * one depending on the parameter setting. + * + * @param positionToMinBy + * The position in the data point to minimize + * @param first + * If true, then the operator return the first element with the + * minimum value, otherwise returns the last + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator minBy(int positionToMinBy, boolean first) { + dataStream.checkFieldRange(positionToMinBy); + return aggregate(new MinByAggregationFunction(positionToMinBy, first)); + } + + /** * Syntactic sugar for min(0) * * @return The transformed DataStream. @@ -182,6 +216,37 @@ public class BatchedDataStream { } /** + * Applies an aggregation that gives the maximum element of every sliding + * batch/window of the data stream by the given position. If more elements + * have the same maximum value the operator returns the first by default. + * + * @param positionToMaxBy + * The position in the data point to maximize + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator maxBy(int positionToMaxBy) { + return this.maxBy(positionToMaxBy, true); + } + + /** + * Applies an aggregation that gives the maximum element of every sliding + * batch/window of the data stream by the given position. If more elements + * have the same maximum value the operator returns either the first or last + * one depending on the parameter setting. + * + * @param positionToMaxBy + * The position in the data point to maximize + * @param first + * If true, then the operator return the first element with the + * maximum value, otherwise returns the last + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator maxBy(int positionToMaxBy, boolean first) { + dataStream.checkFieldRange(positionToMaxBy); + return aggregate(new MaxByAggregationFunction(positionToMaxBy, first)); + } + + /** * Syntactic sugar for max(0) * * @return The transformed DataStream. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70464bb0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 423de4b..8ff8c54 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -39,7 +39,9 @@ import org.apache.flink.streaming.api.JobGraphBuilder; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.aggregation.AggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction; +import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction; +import org.apache.flink.streaming.api.function.aggregation.MinByAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction; import org.apache.flink.streaming.api.function.sink.PrintSinkFunction; import org.apache.flink.streaming.api.function.sink.SinkFunction; @@ -553,6 +555,38 @@ public class DataStream { } /** + * Applies an aggregation that that gives the current element with the + * minimum value at the given position, if more elements have the minimum + * value at the given position, the operator returns the first one by + * default. + * + * @param positionToMinBy + * The position in the data point to minimize + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator minBy(int positionToMinBy) { + return this.minBy(positionToMinBy, true); + } + + /** + * Applies an aggregation that that gives the current element with the + * minimum value at the given position, if more elements have the minimum + * value at the given position, the operator returns either the first or + * last one, depending on the parameter set. + * + * @param positionToMinBy + * The position in the data point to minimize + * @param first + * If true, then the operator return the first element with the + * minimal value, otherwise returns the last + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator minBy(int positionToMinBy, boolean first) { + checkFieldRange(positionToMinBy); + return aggregate(new MinByAggregationFunction(positionToMinBy, first)); + } + + /** * Syntactic sugar for min(0) * * @return The transformed DataStream. @@ -575,6 +609,38 @@ public class DataStream { } /** + * Applies an aggregation that that gives the current element with the + * maximum value at the given position, if more elements have the maximum + * value at the given position, the operator returns the first one by + * default. + * + * @param positionToMaxBy + * The position in the data point to maximize + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator maxBy(int positionToMaxBy) { + return this.maxBy(positionToMaxBy, true); + } + + /** + * Applies an aggregation that that gives the current element with the + * maximum value at the given position, if more elements have the maximum + * value at the given position, the operator returns either the first or + * last one, depending on the parameter set. + * + * @param positionToMaxBy + * The position in the data point to maximize. + * @param first + * If true, then the operator return the first element with the + * maximum value, otherwise returns the last + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator maxBy(int positionToMaxBy, boolean first) { + checkFieldRange(positionToMaxBy); + return aggregate(new MaxByAggregationFunction(positionToMaxBy, first)); + } + + /** * Syntactic sugar for max(0) * * @return The transformed DataStream. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70464bb0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java index 3a61a35..af2f186 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java @@ -75,6 +75,7 @@ public class GroupedDataStream extends DataStream { * The position in the data point to sum * @return The transformed DataStream. */ + @Override public SingleOutputStreamOperator sum(final int positionToSum) { return super.sum(positionToSum); } @@ -88,11 +89,46 @@ public class GroupedDataStream extends DataStream { * The position in the data point to minimize * @return The transformed DataStream. */ + @Override public SingleOutputStreamOperator min(final int positionToMin) { return super.min(positionToMin); } /** + * Applies an aggregation that that gives the current element with the + * minimum value at the given position for each group on a grouped data + * stream. If more elements have the minimum value at the given position, + * the operator returns the first one by default. + * + * @param positionToMinBy + * The position in the data point to minimize + * @return The transformed DataStream. + */ + @Override + public SingleOutputStreamOperator minBy(int positionToMinBy) { + return super.minBy(positionToMinBy); + } + + /** + * Applies an aggregation that that gives the current element with the + * minimum value at the given position for each group on a grouped data + * stream. If more elements have the minimum value at the given position, + * the operator returns either the first or last one depending on the + * parameters. + * + * @param positionToMinBy + * The position in the data point to minimize + * @param first + * If true, then the operator return the first element with the + * maximum value, otherwise returns the last + * @return The transformed DataStream. + */ + @Override + public SingleOutputStreamOperator minBy(int positionToMinBy, boolean first) { + return super.minBy(positionToMinBy, first); + } + + /** * Applies an aggregation that gives the maximum of the grouped data stream * at the given position, grouped by the given key position. Input values * with the same key will be maximized. @@ -101,10 +137,45 @@ public class GroupedDataStream extends DataStream { * The position in the data point to maximize * @return The transformed DataStream. */ + @Override public SingleOutputStreamOperator max(final int positionToMax) { return super.max(positionToMax); } + /** + * Applies an aggregation that that gives the current element with the + * maximum value at the given position for each group on a grouped data + * stream. If more elements have the maximum value at the given position, + * the operator returns the first one by default. + * + * @param positionToMaxBy + * The position in the data point to maximize + * @return The transformed DataStream. + */ + @Override + public SingleOutputStreamOperator maxBy(int positionToMaxBy) { + return super.maxBy(positionToMaxBy); + } + + /** + * Applies an aggregation that that gives the current element with the + * maximum value at the given position for each group on a grouped data + * stream. If more elements have the maximum value at the given position, + * the operator returns either the first or last one depending on the + * parameters. + * + * @param positionToMaxBy + * The position in the data point to maximize + * @param first + * If true, then the operator return the first element with the + * maximum value, otherwise returns the last + * @return The transformed DataStream. + */ + @Override + public SingleOutputStreamOperator maxBy(int positionToMaxBy, boolean first) { + return super.maxBy(positionToMaxBy, first); + } + @Override protected SingleOutputStreamOperator aggregate(AggregationFunction aggregate) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70464bb0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java new file mode 100644 index 0000000..274c8b6 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.aggregation; + +public class MaxByAggregationFunction extends MinByAggregationFunction { + + private static final long serialVersionUID = 1L; + + public MaxByAggregationFunction(int pos, boolean first) { + super(pos, first); + } + + @Override + public boolean isExtremal(Comparable o1, R o2) { + if (first) { + return o1.compareTo(o2) >= 0; + } else { + return o1.compareTo(o2) > 0; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70464bb0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java new file mode 100644 index 0000000..a4a328c --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.aggregation; + +import org.apache.flink.api.java.tuple.Tuple; + +public class MinByAggregationFunction extends ComparableAggregationFunction { + + private static final long serialVersionUID = 1L; + protected boolean first; + + public MinByAggregationFunction(int pos, boolean first) { + super(pos); + this.first = first; + } + + @Override + public void compare(Tuple tuple1, Tuple tuple2) throws InstantiationException, + IllegalAccessException { + + Comparable o1 = tuple1.getField(position); + R o2 = tuple2.getField(position); + + if (isExtremal(o1, o2)) { + returnTuple = tuple1; + } else { + returnTuple = tuple2; + } + } + + @Override + public boolean isExtremal(Comparable o1, R o2) { + if (first) { + return o1.compareTo(o2) <= 0; + } else { + return o1.compareTo(o2) < 0; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70464bb0/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java index d48f8ad..1f86ce1 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java @@ -26,7 +26,9 @@ import java.util.List; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction; +import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction; +import org.apache.flink.streaming.api.function.aggregation.MinByAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction; import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable; import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable; @@ -49,7 +51,7 @@ public class AggregationFunctionTest { List> expectedGroupMaxList = new ArrayList>(); List simpleInput = new ArrayList(); - + int groupedSum0 = 0; int groupedSum1 = 0; int groupedSum2 = 0; @@ -86,16 +88,14 @@ public class AggregationFunctionTest { SumAggregationFunction> sumFunction = SumAggregationFunction .getSumFunction(1, Integer.class); @SuppressWarnings("unchecked") - SumAggregationFunction sumFunction0 = SumAggregationFunction - .getSumFunction(0, Integer.class); + SumAggregationFunction sumFunction0 = SumAggregationFunction.getSumFunction(0, + Integer.class); MinAggregationFunction> minFunction = new MinAggregationFunction>( 1); - MinAggregationFunction minFunction0 = new MinAggregationFunction( - 0); + MinAggregationFunction minFunction0 = new MinAggregationFunction(0); MaxAggregationFunction> maxFunction = new MaxAggregationFunction>( 1); - MaxAggregationFunction maxFunction0 = new MaxAggregationFunction( - 0); + MaxAggregationFunction maxFunction0 = new MaxAggregationFunction(0); List> sumList = MockInvokable.createAndExecute( new StreamReduceInvokable>(sumFunction), getInputList()); @@ -107,13 +107,16 @@ public class AggregationFunctionTest { new StreamReduceInvokable>(maxFunction), getInputList()); List> groupedSumList = MockInvokable.createAndExecute( - new GroupedReduceInvokable>(sumFunction, 0), getInputList()); + new GroupedReduceInvokable>(sumFunction, 0), + getInputList()); List> groupedMinList = MockInvokable.createAndExecute( - new GroupedReduceInvokable>(minFunction, 0), getInputList()); + new GroupedReduceInvokable>(minFunction, 0), + getInputList()); List> groupedMaxList = MockInvokable.createAndExecute( - new GroupedReduceInvokable>(maxFunction, 0), getInputList()); + new GroupedReduceInvokable>(maxFunction, 0), + getInputList()); assertEquals(expectedSumList, sumList); assertEquals(expectedMinList, minList); @@ -121,31 +124,171 @@ public class AggregationFunctionTest { assertEquals(expectedGroupSumList, groupedSumList); assertEquals(expectedGroupMinList, groupedMinList); assertEquals(expectedGroupMaxList, groupedMaxList); - assertEquals(expectedSumList0, MockInvokable.createAndExecute(new StreamReduceInvokable(sumFunction0),simpleInput )); - assertEquals(expectedMinList0, MockInvokable.createAndExecute(new StreamReduceInvokable(minFunction0),simpleInput )); - assertEquals(expectedMaxList0, MockInvokable.createAndExecute(new StreamReduceInvokable(maxFunction0),simpleInput )); - + assertEquals(expectedSumList0, MockInvokable.createAndExecute( + new StreamReduceInvokable(sumFunction0), simpleInput)); + assertEquals(expectedMinList0, MockInvokable.createAndExecute( + new StreamReduceInvokable(minFunction0), simpleInput)); + assertEquals(expectedMaxList0, MockInvokable.createAndExecute( + new StreamReduceInvokable(maxFunction0), simpleInput)); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); try { env.generateSequence(1, 100).min(1); fail(); } catch (Exception e) { - //Nothing to do here + // Nothing to do here } try { env.generateSequence(1, 100).min(2); fail(); } catch (Exception e) { - //Nothing to do here + // Nothing to do here } try { env.generateSequence(1, 100).min(3); fail(); } catch (Exception e) { - //Nothing to do here + // Nothing to do here } + MaxByAggregationFunction> maxByFunctionFirst = new MaxByAggregationFunction>( + 0, true); + MaxByAggregationFunction> maxByFunctionLast = new MaxByAggregationFunction>( + 0, false); + + MinByAggregationFunction> minByFunctionFirst = new MinByAggregationFunction>( + 0, true); + MinByAggregationFunction> minByFunctionLast = new MinByAggregationFunction>( + 0, false); + + List> maxByFirstExpected = new ArrayList>(); + maxByFirstExpected.add(new Tuple2(0, 0)); + maxByFirstExpected.add(new Tuple2(1, 1)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + + List> maxByLastExpected = new ArrayList>(); + maxByLastExpected.add(new Tuple2(0, 0)); + maxByLastExpected.add(new Tuple2(1, 1)); + maxByLastExpected.add(new Tuple2(2, 2)); + maxByLastExpected.add(new Tuple2(2, 2)); + maxByLastExpected.add(new Tuple2(2, 2)); + maxByLastExpected.add(new Tuple2(2, 5)); + maxByLastExpected.add(new Tuple2(2, 5)); + maxByLastExpected.add(new Tuple2(2, 5)); + maxByLastExpected.add(new Tuple2(2, 8)); + + List> minByFirstExpected = new ArrayList>(); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + + List> minByLastExpected = new ArrayList>(); + minByLastExpected.add(new Tuple2(0, 0)); + minByLastExpected.add(new Tuple2(0, 0)); + minByLastExpected.add(new Tuple2(0, 0)); + minByLastExpected.add(new Tuple2(0, 3)); + minByLastExpected.add(new Tuple2(0, 3)); + minByLastExpected.add(new Tuple2(0, 3)); + minByLastExpected.add(new Tuple2(0, 6)); + minByLastExpected.add(new Tuple2(0, 6)); + minByLastExpected.add(new Tuple2(0, 6)); + + assertEquals(maxByFirstExpected, MockInvokable.createAndExecute( + new StreamReduceInvokable>(maxByFunctionFirst), + getInputList())); + assertEquals(maxByLastExpected, MockInvokable.createAndExecute( + new StreamReduceInvokable>(maxByFunctionLast), + getInputList())); + assertEquals(minByLastExpected, MockInvokable.createAndExecute( + new StreamReduceInvokable>(minByFunctionLast), + getInputList())); + assertEquals(minByFirstExpected, MockInvokable.createAndExecute( + new StreamReduceInvokable>(minByFunctionFirst), + getInputList())); + + } + + @Test + public void minMaxByTest() { + + MaxByAggregationFunction> maxByFunctionFirst = new MaxByAggregationFunction>( + 0, true); + MaxByAggregationFunction> maxByFunctionLast = new MaxByAggregationFunction>( + 0, false); + + MinByAggregationFunction> minByFunctionFirst = new MinByAggregationFunction>( + 0, true); + MinByAggregationFunction> minByFunctionLast = new MinByAggregationFunction>( + 0, false); + + List> maxByFirstExpected = new ArrayList>(); + maxByFirstExpected.add(new Tuple2(0, 0)); + maxByFirstExpected.add(new Tuple2(1, 1)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + + List> maxByLastExpected = new ArrayList>(); + maxByLastExpected.add(new Tuple2(0, 0)); + maxByLastExpected.add(new Tuple2(1, 1)); + maxByLastExpected.add(new Tuple2(2, 2)); + maxByLastExpected.add(new Tuple2(2, 2)); + maxByLastExpected.add(new Tuple2(2, 2)); + maxByLastExpected.add(new Tuple2(2, 5)); + maxByLastExpected.add(new Tuple2(2, 5)); + maxByLastExpected.add(new Tuple2(2, 5)); + maxByLastExpected.add(new Tuple2(2, 8)); + + List> minByFirstExpected = new ArrayList>(); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + + List> minByLastExpected = new ArrayList>(); + minByLastExpected.add(new Tuple2(0, 0)); + minByLastExpected.add(new Tuple2(0, 0)); + minByLastExpected.add(new Tuple2(0, 0)); + minByLastExpected.add(new Tuple2(0, 3)); + minByLastExpected.add(new Tuple2(0, 3)); + minByLastExpected.add(new Tuple2(0, 3)); + minByLastExpected.add(new Tuple2(0, 6)); + minByLastExpected.add(new Tuple2(0, 6)); + minByLastExpected.add(new Tuple2(0, 6)); + + assertEquals(maxByFirstExpected, MockInvokable.createAndExecute( + new StreamReduceInvokable>(maxByFunctionFirst), + getInputList())); + assertEquals(maxByLastExpected, MockInvokable.createAndExecute( + new StreamReduceInvokable>(maxByFunctionLast), + getInputList())); + assertEquals(minByLastExpected, MockInvokable.createAndExecute( + new StreamReduceInvokable>(minByFunctionLast), + getInputList())); + assertEquals(minByFirstExpected, MockInvokable.createAndExecute( + new StreamReduceInvokable>(minByFunctionFirst), + getInputList())); } private List> getInputList() {