flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [01/12] git commit: [FLINK-1121] [streaming] minBy and maxBy operators added to streaming api
Date Wed, 24 Sep 2014 19:51:34 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master a3b02840d -> cb81319d9


[FLINK-1121] [streaming] minBy and maxBy operators added to streaming api


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/70464bb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/70464bb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/70464bb0

Branch: refs/heads/master
Commit: 70464bb0a44f682c155fdfdd2a6b0a6cc1203663
Parents: 30ac9fe
Author: Gyula Fora <gyfora@apache.org>
Authored: Wed Sep 24 16:57:36 2014 +0200
Committer: mbalassi <balassi.marton@gmail.com>
Committed: Wed Sep 24 19:54:39 2014 +0200

----------------------------------------------------------------------
 docs/streaming_guide.md                         |   6 +-
 .../api/datastream/BatchedDataStream.java       |  69 +++++++-
 .../streaming/api/datastream/DataStream.java    |  66 +++++++
 .../api/datastream/GroupedDataStream.java       |  71 ++++++++
 .../aggregation/MaxByAggregationFunction.java   |  37 ++++
 .../aggregation/MinByAggregationFunction.java   |  55 ++++++
 .../streaming/api/AggregationFunctionTest.java  | 177 +++++++++++++++++--
 7 files changed, 460 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70464bb0/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 27a32ba..37ff90d 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -246,9 +246,11 @@ When the reduce operator is applied on a grouped data stream, the user-defined
`
 
 The Flink Streaming API supports different types of aggregation operators similarly to the
core API. For grouped data streams the aggregations work in a grouped fashion.
 
-Types of aggregations: `sum(fieldPosition)`, `min(fieldPosition)`, `max(fieldPosition)`
+Types of aggregations: `sum(fieldPosition)`, `min(fieldPosition)`, `max(fieldPosition)`,
`minBy(fieldPosition, first)`, `maxBy(fieldPosition, first)`
 
-For every incoming tuple the selected field is replaced with the current aggregated value.
If the aggregations are used without defining field position, position `0` is used as default.

+With `sum`, `min`, and `max` for every incoming tuple the selected field is replaced with
the current aggregated value. If the aggregations are used without defining field position,
position `0` is used as default. 
+
+With `minBy` and `maxBy` the output of the operator is the element with the current minimal
or maximal value at the given fieldposition. If more components share the minimum or maximum
value, the user can decide if the operator should return the first or last element. This can
be set by the `first` boolean parameter.
 
 ### Window/Batch operators
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70464bb0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
index 51f1467..e8e3f31 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
@@ -24,7 +24,9 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MinByAggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
@@ -118,8 +120,8 @@ public class BatchedDataStream<OUT> {
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> reduceGroup(GroupReduceFunction<OUT,
R> reducer) {
 		return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
-				GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer, GroupReduceFunction.class,
-				1), getGroupReduceInvokable(reducer));
+				GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
+				GroupReduceFunction.class, 1), getGroupReduceInvokable(reducer));
 	}
 
 	/**
@@ -160,6 +162,38 @@ public class BatchedDataStream<OUT> {
 	}
 
 	/**
+	 * Applies an aggregation that gives the minimum element of every sliding
+	 * batch/window of the data stream by the given position. If more elements
+	 * have the same minimum value the operator returns the first element by
+	 * default.
+	 * 
+	 * @param positionToMinBy
+	 *            The position in the data point to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every sliding
+	 * batch/window of the data stream by the given position. If more elements
+	 * have the same minimum value the operator returns either the first or last
+	 * one depending on the parameter setting.
+	 * 
+	 * @param positionToMinBy
+	 *            The position in the data point to minimize
+	 * @param first
+	 *            If true, then the operator return the first element with the
+	 *            minimum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first)
{
+		dataStream.checkFieldRange(positionToMinBy);
+		return aggregate(new MinByAggregationFunction<OUT>(positionToMinBy, first));
+	}
+
+	/**
 	 * Syntactic sugar for min(0)
 	 * 
 	 * @return The transformed DataStream.
@@ -182,6 +216,37 @@ public class BatchedDataStream<OUT> {
 	}
 
 	/**
+	 * Applies an aggregation that gives the maximum element of every sliding
+	 * batch/window of the data stream by the given position. If more elements
+	 * have the same maximum value the operator returns the first by default.
+	 * 
+	 * @param positionToMaxBy
+	 *            The position in the data point to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum element of every sliding
+	 * batch/window of the data stream by the given position. If more elements
+	 * have the same maximum value the operator returns either the first or last
+	 * one depending on the parameter setting.
+	 * 
+	 * @param positionToMaxBy
+	 *            The position in the data point to maximize
+	 * @param first
+	 *            If true, then the operator return the first element with the
+	 *            maximum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first)
{
+		dataStream.checkFieldRange(positionToMaxBy);
+		return aggregate(new MaxByAggregationFunction<OUT>(positionToMaxBy, first));
+	}
+
+	/**
 	 * Syntactic sugar for max(0)
 	 * 
 	 * @return The transformed DataStream.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70464bb0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 423de4b..8ff8c54 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -39,7 +39,9 @@ import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MinByAggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
 import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
@@ -553,6 +555,38 @@ public class DataStream<OUT> {
 	}
 
 	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * minimum value at the given position, if more elements have the minimum
+	 * value at the given position, the operator returns the first one by
+	 * default.
+	 * 
+	 * @param positionToMinBy
+	 *            The position in the data point to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * minimum value at the given position, if more elements have the minimum
+	 * value at the given position, the operator returns either the first or
+	 * last one, depending on the parameter set.
+	 * 
+	 * @param positionToMinBy
+	 *            The position in the data point to minimize
+	 * @param first
+	 *            If true, then the operator return the first element with the
+	 *            minimal value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first)
{
+		checkFieldRange(positionToMinBy);
+		return aggregate(new MinByAggregationFunction<OUT>(positionToMinBy, first));
+	}
+
+	/**
 	 * Syntactic sugar for min(0)
 	 * 
 	 * @return The transformed DataStream.
@@ -575,6 +609,38 @@ public class DataStream<OUT> {
 	}
 
 	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * maximum value at the given position, if more elements have the maximum
+	 * value at the given position, the operator returns the first one by
+	 * default.
+	 * 
+	 * @param positionToMaxBy
+	 *            The position in the data point to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * maximum value at the given position, if more elements have the maximum
+	 * value at the given position, the operator returns either the first or
+	 * last one, depending on the parameter set.
+	 * 
+	 * @param positionToMaxBy
+	 *            The position in the data point to maximize.
+	 * @param first
+	 *            If true, then the operator return the first element with the
+	 *            maximum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first)
{
+		checkFieldRange(positionToMaxBy);
+		return aggregate(new MaxByAggregationFunction<OUT>(positionToMaxBy, first));
+	}
+
+	/**
 	 * Syntactic sugar for max(0)
 	 * 
 	 * @return The transformed DataStream.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70464bb0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 3a61a35..af2f186 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -75,6 +75,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT>
{
 	 *            The position in the data point to sum
 	 * @return The transformed DataStream.
 	 */
+	@Override
 	public SingleOutputStreamOperator<OUT, ?> sum(final int positionToSum) {
 		return super.sum(positionToSum);
 	}
@@ -88,11 +89,46 @@ public class GroupedDataStream<OUT> extends DataStream<OUT>
{
 	 *            The position in the data point to minimize
 	 * @return The transformed DataStream.
 	 */
+	@Override
 	public SingleOutputStreamOperator<OUT, ?> min(final int positionToMin) {
 		return super.min(positionToMin);
 	}
 
 	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * minimum value at the given position for each group on a grouped data
+	 * stream. If more elements have the minimum value at the given position,
+	 * the operator returns the first one by default.
+	 * 
+	 * @param positionToMinBy
+	 *            The position in the data point to minimize
+	 * @return The transformed DataStream.
+	 */
+	@Override
+	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
+		return super.minBy(positionToMinBy);
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * minimum value at the given position for each group on a grouped data
+	 * stream. If more elements have the minimum value at the given position,
+	 * the operator returns either the first or last one depending on the
+	 * parameters.
+	 * 
+	 * @param positionToMinBy
+	 *            The position in the data point to minimize
+	 * @param first
+	 *            If true, then the operator return the first element with the
+	 *            maximum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	@Override
+	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first)
{
+		return super.minBy(positionToMinBy, first);
+	}
+
+	/**
 	 * Applies an aggregation that gives the maximum of the grouped data stream
 	 * at the given position, grouped by the given key position. Input values
 	 * with the same key will be maximized.
@@ -101,10 +137,45 @@ public class GroupedDataStream<OUT> extends DataStream<OUT>
{
 	 *            The position in the data point to maximize
 	 * @return The transformed DataStream.
 	 */
+	@Override
 	public SingleOutputStreamOperator<OUT, ?> max(final int positionToMax) {
 		return super.max(positionToMax);
 	}
 
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * maximum value at the given position for each group on a grouped data
+	 * stream. If more elements have the maximum value at the given position,
+	 * the operator returns the first one by default.
+	 * 
+	 * @param positionToMaxBy
+	 *            The position in the data point to maximize
+	 * @return The transformed DataStream.
+	 */
+	@Override
+	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
+		return super.maxBy(positionToMaxBy);
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * maximum value at the given position for each group on a grouped data
+	 * stream. If more elements have the maximum value at the given position,
+	 * the operator returns either the first or last one depending on the
+	 * parameters.
+	 * 
+	 * @param positionToMaxBy
+	 *            The position in the data point to maximize
+	 * @param first
+	 *            If true, then the operator return the first element with the
+	 *            maximum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	@Override
+	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first)
{
+		return super.maxBy(positionToMaxBy, first);
+	}
+
 	@Override
 	protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT>
aggregate) {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70464bb0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java
new file mode 100644
index 0000000..274c8b6
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+public class MaxByAggregationFunction<T> extends MinByAggregationFunction<T>
{
+
+	private static final long serialVersionUID = 1L;
+
+	public MaxByAggregationFunction(int pos, boolean first) {
+		super(pos, first);
+	}
+
+	@Override
+	public <R> boolean isExtremal(Comparable<R> o1, R o2) {
+		if (first) {
+			return o1.compareTo(o2) >= 0;
+		} else {
+			return o1.compareTo(o2) > 0;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70464bb0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java
new file mode 100644
index 0000000..a4a328c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+public class MinByAggregationFunction<T> extends ComparableAggregationFunction<T>
{
+
+	private static final long serialVersionUID = 1L;
+	protected boolean first;
+
+	public MinByAggregationFunction(int pos, boolean first) {
+		super(pos);
+		this.first = first;
+	}
+
+	@Override
+	public <R> void compare(Tuple tuple1, Tuple tuple2) throws InstantiationException,
+			IllegalAccessException {
+
+		Comparable<R> o1 = tuple1.getField(position);
+		R o2 = tuple2.getField(position);
+
+		if (isExtremal(o1, o2)) {
+			returnTuple = tuple1;
+		} else {
+			returnTuple = tuple2;
+		}
+	}
+
+	@Override
+	public <R> boolean isExtremal(Comparable<R> o1, R o2) {
+		if (first) {
+			return o1.compareTo(o2) <= 0;
+		} else {
+			return o1.compareTo(o2) < 0;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70464bb0/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index d48f8ad..1f86ce1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -26,7 +26,9 @@ import java.util.List;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MinByAggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
 import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
@@ -49,7 +51,7 @@ public class AggregationFunctionTest {
 		List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<Tuple2<Integer,
Integer>>();
 
 		List<Integer> simpleInput = new ArrayList<Integer>();
-		
+
 		int groupedSum0 = 0;
 		int groupedSum1 = 0;
 		int groupedSum2 = 0;
@@ -86,16 +88,14 @@ public class AggregationFunctionTest {
 		SumAggregationFunction<Tuple2<Integer, Integer>> sumFunction = SumAggregationFunction
 				.getSumFunction(1, Integer.class);
 		@SuppressWarnings("unchecked")
-		SumAggregationFunction<Integer> sumFunction0 = SumAggregationFunction
-				.getSumFunction(0, Integer.class);
+		SumAggregationFunction<Integer> sumFunction0 = SumAggregationFunction.getSumFunction(0,
+				Integer.class);
 		MinAggregationFunction<Tuple2<Integer, Integer>> minFunction = new MinAggregationFunction<Tuple2<Integer,
Integer>>(
 				1);
-		MinAggregationFunction<Integer> minFunction0 = new MinAggregationFunction<Integer>(
-				0);
+		MinAggregationFunction<Integer> minFunction0 = new MinAggregationFunction<Integer>(0);
 		MaxAggregationFunction<Tuple2<Integer, Integer>> maxFunction = new MaxAggregationFunction<Tuple2<Integer,
Integer>>(
 				1);
-		MaxAggregationFunction<Integer> maxFunction0 = new MaxAggregationFunction<Integer>(
-				0);
+		MaxAggregationFunction<Integer> maxFunction0 = new MaxAggregationFunction<Integer>(0);
 
 		List<Tuple2<Integer, Integer>> sumList = MockInvokable.createAndExecute(
 				new StreamReduceInvokable<Tuple2<Integer, Integer>>(sumFunction), getInputList());
@@ -107,13 +107,16 @@ public class AggregationFunctionTest {
 				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedSumList = MockInvokable.createAndExecute(
-				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, 0), getInputList());
+				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, 0),
+				getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedMinList = MockInvokable.createAndExecute(
-				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(minFunction, 0), getInputList());
+				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(minFunction, 0),
+				getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedMaxList = MockInvokable.createAndExecute(
-				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, 0), getInputList());
+				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, 0),
+				getInputList());
 
 		assertEquals(expectedSumList, sumList);
 		assertEquals(expectedMinList, minList);
@@ -121,31 +124,171 @@ public class AggregationFunctionTest {
 		assertEquals(expectedGroupSumList, groupedSumList);
 		assertEquals(expectedGroupMinList, groupedMinList);
 		assertEquals(expectedGroupMaxList, groupedMaxList);
-		assertEquals(expectedSumList0, MockInvokable.createAndExecute(new StreamReduceInvokable<Integer>(sumFunction0),simpleInput
));
-		assertEquals(expectedMinList0, MockInvokable.createAndExecute(new StreamReduceInvokable<Integer>(minFunction0),simpleInput
));
-		assertEquals(expectedMaxList0, MockInvokable.createAndExecute(new StreamReduceInvokable<Integer>(maxFunction0),simpleInput
));
-
+		assertEquals(expectedSumList0, MockInvokable.createAndExecute(
+				new StreamReduceInvokable<Integer>(sumFunction0), simpleInput));
+		assertEquals(expectedMinList0, MockInvokable.createAndExecute(
+				new StreamReduceInvokable<Integer>(minFunction0), simpleInput));
+		assertEquals(expectedMaxList0, MockInvokable.createAndExecute(
+				new StreamReduceInvokable<Integer>(maxFunction0), simpleInput));
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
 		try {
 			env.generateSequence(1, 100).min(1);
 			fail();
 		} catch (Exception e) {
-			//Nothing to do here
+			// Nothing to do here
 		}
 		try {
 			env.generateSequence(1, 100).min(2);
 			fail();
 		} catch (Exception e) {
-			//Nothing to do here
+			// Nothing to do here
 		}
 		try {
 			env.generateSequence(1, 100).min(3);
 			fail();
 		} catch (Exception e) {
-			//Nothing to do here
+			// Nothing to do here
 		}
 
+		MaxByAggregationFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = new
MaxByAggregationFunction<Tuple2<Integer, Integer>>(
+				0, true);
+		MaxByAggregationFunction<Tuple2<Integer, Integer>> maxByFunctionLast = new
MaxByAggregationFunction<Tuple2<Integer, Integer>>(
+				0, false);
+
+		MinByAggregationFunction<Tuple2<Integer, Integer>> minByFunctionFirst = new
MinByAggregationFunction<Tuple2<Integer, Integer>>(
+				0, true);
+		MinByAggregationFunction<Tuple2<Integer, Integer>> minByFunctionLast = new
MinByAggregationFunction<Tuple2<Integer, Integer>>(
+				0, false);
+
+		List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer,
Integer>>();
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1));
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+
+		List<Tuple2<Integer, Integer>> maxByLastExpected = new ArrayList<Tuple2<Integer,
Integer>>();
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1));
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8));
+
+		List<Tuple2<Integer, Integer>> minByFirstExpected = new ArrayList<Tuple2<Integer,
Integer>>();
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+
+		List<Tuple2<Integer, Integer>> minByLastExpected = new ArrayList<Tuple2<Integer,
Integer>>();
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+
+		assertEquals(maxByFirstExpected, MockInvokable.createAndExecute(
+				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst),
+				getInputList()));
+		assertEquals(maxByLastExpected, MockInvokable.createAndExecute(
+				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast),
+				getInputList()));
+		assertEquals(minByLastExpected, MockInvokable.createAndExecute(
+				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast),
+				getInputList()));
+		assertEquals(minByFirstExpected, MockInvokable.createAndExecute(
+				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst),
+				getInputList()));
+
+	}
+
+	@Test
+	public void minMaxByTest() {
+
+		MaxByAggregationFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = new
MaxByAggregationFunction<Tuple2<Integer, Integer>>(
+				0, true);
+		MaxByAggregationFunction<Tuple2<Integer, Integer>> maxByFunctionLast = new
MaxByAggregationFunction<Tuple2<Integer, Integer>>(
+				0, false);
+
+		MinByAggregationFunction<Tuple2<Integer, Integer>> minByFunctionFirst = new
MinByAggregationFunction<Tuple2<Integer, Integer>>(
+				0, true);
+		MinByAggregationFunction<Tuple2<Integer, Integer>> minByFunctionLast = new
MinByAggregationFunction<Tuple2<Integer, Integer>>(
+				0, false);
+
+		List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer,
Integer>>();
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1));
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+
+		List<Tuple2<Integer, Integer>> maxByLastExpected = new ArrayList<Tuple2<Integer,
Integer>>();
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1));
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
+		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8));
+
+		List<Tuple2<Integer, Integer>> minByFirstExpected = new ArrayList<Tuple2<Integer,
Integer>>();
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+
+		List<Tuple2<Integer, Integer>> minByLastExpected = new ArrayList<Tuple2<Integer,
Integer>>();
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+
+		assertEquals(maxByFirstExpected, MockInvokable.createAndExecute(
+				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst),
+				getInputList()));
+		assertEquals(maxByLastExpected, MockInvokable.createAndExecute(
+				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast),
+				getInputList()));
+		assertEquals(minByLastExpected, MockInvokable.createAndExecute(
+				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast),
+				getInputList()));
+		assertEquals(minByFirstExpected, MockInvokable.createAndExecute(
+				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst),
+				getInputList()));
 	}
 
 	private List<Tuple2<Integer, Integer>> getInputList() {


Mime
View raw message