flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [05/18] git commit: [FLINK-1080] [streaming] Streaming aggregation added
Date Sat, 20 Sep 2014 13:10:48 GMT
[FLINK-1080] [streaming] Streaming aggregation added

Conflicts:
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/0c8f1dac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/0c8f1dac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/0c8f1dac

Branch: refs/heads/master
Commit: 0c8f1dacc2d97b9b4786ac4f0ccf149ed6998d46
Parents: fbfcc9e
Author: Eszes Dávid <eszesdavid@gmail.com>
Authored: Tue Sep 2 15:58:14 2014 +0200
Committer: mbalassi <balassi.marton@gmail.com>
Committed: Sat Sep 20 13:42:04 2014 +0200

----------------------------------------------------------------------
 .../connectors/twitter/TwitterLocal.java        |   3 +-
 .../flink/streaming/api/JobGraphBuilder.java    |  34 ++++++
 .../api/collector/StreamCollector.java          |   3 +-
 .../streaming/api/datastream/DataStream.java    |  66 ++++++++++-
 .../api/datastream/GroupedDataStream.java       |  70 ++++++++++--
 .../ComparableAggregationFunction.java          |  64 +++++++++++
 .../StreamingAggregationFunction.java           |  45 ++++++++
 .../StreamingMaxAggregationFunction.java        |  32 ++++++
 .../StreamingMinAggregationFunction.java        |  32 ++++++
 .../StreamingSumAggregationFunction.java        |  64 +++++++++++
 .../streaming/api/AggregationFunctionTest.java  | 109 +++++++++++++++++++
 .../examples/wordcount/WordCountLocal.java      |  25 +----
 12 files changed, 510 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0c8f1dac/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
index 2ae765e..06a1308 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.examples.function.JSONParseFlatMap;
-import org.apache.flink.streaming.examples.wordcount.WordCountLocal.WordCountCounter;
 import org.apache.flink.util.Collector;
 import org.apache.sling.commons.json.JSONException;
 
@@ -90,7 +89,7 @@ public class TwitterLocal {
 					}
 				})
 				.groupBy(0)
-				.reduce(new WordCountCounter());
+				.sum(1);
 
 		dataStream.print();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0c8f1dac/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 7973324..cd54a54 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -48,6 +48,7 @@ import org.apache.flink.streaming.api.streamcomponent.StreamTask;
 import org.apache.flink.streaming.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+import org.apache.flink.types.TypeInformation;
 
 /**
  * Object for building Apache Flink stream processing job graphs
@@ -322,6 +323,7 @@ public class JobGraphBuilder {
 		iterationIds.put(componentName, iterationID);
 		iterationIDtoSinkName.put(iterationID, componentName);
 		setBytesFrom(iterationTail, componentName);
+		//setInTypeWrappersFrom(iterationTail, componentName);
 		iterationWaitTime.put(iterationIDtoSinkName.get(iterationID), waitTime);
 
 		if (LOG.isDebugEnabled()) {
@@ -602,11 +604,43 @@ public class JobGraphBuilder {
 		operatorNames.put(to, operatorNames.get(from));
 		serializedFunctions.put(to, serializedFunctions.get(from));
 
+		setTypeWrappersFrom(from, to);
+	}
+
+	public void setTypeWrappersFrom(String from, String to) {
+		setInToOutTypeWrappersFrom(from, to);
+		setOutToOutTypeWrappersFrom(from, to);
+	}
+
+	public void setInToOutTypeWrappersFrom(String from, String to) {
+		//TODO rename function
 		typeWrapperIn1.put(to, typeWrapperOut1.get(from));
 		typeWrapperIn2.put(to, typeWrapperOut2.get(from));
+	}
+	
+	public void setOutToOutTypeWrappersFrom(String from, String to) {
+		//TODO rename function
 		typeWrapperOut1.put(to, typeWrapperOut1.get(from));
 		typeWrapperOut2.put(to, typeWrapperOut2.get(from));
 	}
+	
+	public void setInToInTypeWrappersFrom(String from, String to) {
+		//TODO rename function
+		typeWrapperIn1.put(to, typeWrapperIn1.get(from));
+		typeWrapperIn2.put(to, typeWrapperIn2.get(from));
+	}
+	
+	public TypeInformation<?> getInTypeInfo(String id){
+		// TODO 
+		System.out.println("DEBUG TypeInfo " + typeWrapperIn1.get(id));
+		return typeWrapperIn1.get(id).getTypeInfo();
+	}
+	
+	public TypeInformation<?> getOutTypeInfo(String id){
+		// TODO 
+		return typeWrapperOut1.get(id).getTypeInfo();
+	}
+	
 
 	/**
 	 * Sets instance sharing between the given components

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0c8f1dac/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
index ce4069e..54cab72 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
@@ -59,6 +59,7 @@ public class StreamCollector<OUT> implements Collector<OUT>
{
 	public StreamCollector(int channelID,
 			SerializationDelegate<StreamRecord<OUT>> serializationDelegate) {
 		this.serializationDelegate = serializationDelegate;
+		
 		if (serializationDelegate != null) {
 			this.streamRecord = serializationDelegate.getInstance();
 		} else {
@@ -145,4 +146,4 @@ public class StreamCollector<OUT> implements Collector<OUT>
{
 	@Override
 	public void close() {
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0c8f1dac/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 64a07b5..9375762 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -36,6 +36,10 @@ import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.aggregation.StreamingMaxAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.StreamingMinAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.StreamingAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.StreamingSumAggregationFunction;
 import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
@@ -60,6 +64,7 @@ import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+import org.apache.flink.types.TypeInformation;
 
 /**
  * A DataStream represents a stream of elements of the same type. A DataStream
@@ -465,6 +470,60 @@ public abstract class DataStream<OUT> {
 	}
 
 	/**
+	 * Applies an aggregation that sums the data stream at the given
+	 * position.
+	 * 
+	 * @param positionToSum
+	 *            The position in the data point to sum
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
+		return aggregateAll(new StreamingSumAggregationFunction<OUT>(positionToSum));
+	}
+	
+	/**
+	 * Applies an aggregation that that gives the minimum of the data stream at the given
+	 * position.
+	 * 
+	 * @param positionToMin
+	 *            The position in the data point to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
+		return aggregateAll(new StreamingMinAggregationFunction<OUT>(positionToMin));
+	}
+	
+	/**
+	 * Applies an aggregation that gives the maximum of the data stream at the given
+	 * position.
+	 * 
+	 * @param positionToMax
+	 *            The position in the data point to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
+		return aggregateAll(new StreamingMaxAggregationFunction<OUT>(positionToMax));
+	}
+
+	private SingleOutputStreamOperator<OUT, ?> aggregateAll(StreamingAggregationFunction<OUT>
aggregate) {
+		return aggregate(aggregate, new StreamReduceInvokable<OUT>(aggregate), "reduce");
+	}
+	
+	SingleOutputStreamOperator<OUT, ?> aggregate(StreamingAggregationFunction<OUT>
aggregate, StreamReduceInvokable<OUT> invokable, String functionName) {
+		DataStream<OUT> inputStream = this.copy();
+		TypeInformation<?> info = this.jobGraphBuilder.getOutTypeInfo(inputStream.getId());
+
+		aggregate.setType(info);
+
+		SingleOutputStreamOperator<OUT, ?> returnStream = inputStream.addFunction(functionName,
+				aggregate, null, null, invokable);
+
+		this.jobGraphBuilder.setTypeWrappersFrom(inputStream.getId(), returnStream.getId());
+
+		return returnStream;
+	}
+
+	/**
 	 * Applies a Filter transformation on a {@link DataStream}. The
 	 * transformation calls a {@link FilterFunction} for each element of the
 	 * DataStream and retains only those element for which the function returns
@@ -497,7 +556,7 @@ public abstract class DataStream<OUT> {
 		PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>();
 		DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, null);
 
-		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+		jobGraphBuilder.setInToOutTypeWrappersFrom(inputStream.getId(), returnStream.getId());
 
 		return returnStream;
 	}
@@ -853,9 +912,8 @@ public abstract class DataStream<OUT> {
 	 *            type of the return stream
 	 * @return the data stream constructed
 	 */
-	protected <R> SingleOutputStreamOperator<R, ?> addFunction(
-			String functionName, final Function function,
-			TypeSerializerWrapper<OUT> inTypeWrapper,
+	protected <R> SingleOutputStreamOperator<R, ?> addFunction(String functionName,
+			final Function function, TypeSerializerWrapper<OUT> inTypeWrapper,
 			TypeSerializerWrapper<R> outTypeWrapper,
 			StreamOperatorInvokable<OUT, R> functionInvokable) {
 		DataStream<OUT> inputStream = this.copy();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0c8f1dac/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 4d0265a..94d6c8d 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -19,7 +19,12 @@ package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.streaming.api.function.aggregation.StreamingAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.StreamingMaxAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.StreamingMinAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.StreamingSumAggregationFunction;
 import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
@@ -27,6 +32,8 @@ import org.apache.flink.streaming.api.invokable.util.DefaultTimestamp;
 import org.apache.flink.streaming.api.invokable.util.Timestamp;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 
+//import org.apache.jasper.compiler.Node.ParamsAction;
+
 /**
  * A GroupedDataStream represents a data stream which has been partitioned by
  * the given key in the values. Operators like {@link #reduce},
@@ -46,12 +53,12 @@ public class GroupedDataStream<OUT> {
 	}
 
 	/**
-	 * Applies a reduce transformation on the grouped data stream grouped on by
-	 * the given key position. The {@link ReduceFunction} will receive input
-	 * values based on the key value. Only input values with the same key will
-	 * go to the same reducer.The user can also extend
-	 * {@link RichReduceFunction} to gain access to other features provided by
-	 * the {@link RichFuntion} interface.
+	 * Applies a reduce transformation on the grouped data stream grouped by the
+	 * given key position. The {@link ReduceFunction} will receive input values
+	 * based on the key value. Only input values with the same key will go to
+	 * the same reducer.The user can also extend {@link RichReduceFunction} to
+	 * gain access to other features provided by the {@link RichFuntion}
+	 * interface.
 	 * 
 	 * @param reducer
 	 *            The {@link ReduceFunction} that will be called for every
@@ -160,17 +167,16 @@ public class GroupedDataStream<OUT> {
 			long windowSize, long slideInterval) {
 		return windowReduce(reducer, windowSize, slideInterval, new DefaultTimestamp<OUT>());
 	}
-	
+
 	/**
 	 * Applies a group reduce transformation on preset "time" chunks of the
 	 * grouped data stream in a sliding window fashion. The
 	 * {@link GroupReduceFunction} will receive input values based on the key
 	 * value. Only input values with the same key will go to the same reducer.
 	 * When the reducer has ran for all the values in the batch, the window is
-	 * shifted forward. The time is determined by a
-	 * user-defined timestamp. The user can also extend {@link RichGroupReduceFunction}
-	 * to gain access to other features provided by the {@link RichFuntion}
-	 * interface.
+	 * shifted forward. The time is determined by a user-defined timestamp. The
+	 * user can also extend {@link RichGroupReduceFunction} to gain access to
+	 * other features provided by the {@link RichFuntion} interface.
 	 *
 	 * @param reducer
 	 *            The GroupReduceFunction that is called for each time window.
@@ -191,4 +197,46 @@ public class GroupedDataStream<OUT> {
 				windowSize, slideInterval, keyPosition, timestamp));
 	}
 
+	/**
+	 * Applies an aggregation that sums the grouped data stream at the given
+	 * position, grouped by the given key position. Input values with the same
+	 * key will be summed.
+	 * 
+	 * @param positionToSum
+	 *            The position in the data point to sum
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> sum(final int positionToSum) {
+		return aggregateGroup(new StreamingSumAggregationFunction<OUT>(positionToSum));
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum of the grouped data stream
+	 * at the given position, grouped by the given key position. Input values
+	 * with the same key will be minimized.
+	 * 
+	 * @param positionToMin
+	 *            The position in the data point to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> min(final int positionToMin) {
+		return aggregateGroup(new StreamingMinAggregationFunction<OUT>(positionToMin));
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum of the grouped data stream
+	 * at the given position, grouped by the given key position. Input values
+	 * with the same key will be maximized.
+	 * 
+	 * @param positionToMax
+	 *            The position in the data point to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> max(final int positionToMax) {
+		return aggregateGroup(new StreamingMaxAggregationFunction<OUT>(positionToMax));
+	}
+
+	private SingleOutputStreamOperator<OUT, ?> aggregateGroup(StreamingAggregationFunction<OUT>
aggregate) {
+		return this.dataStream.aggregate(aggregate, new GroupReduceInvokable<OUT>(aggregate,
keyPosition), "groupReduce");
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0c8f1dac/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java
new file mode 100644
index 0000000..dc74715
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+public abstract class ComparableAggregationFunction<T> extends StreamingAggregationFunction<T>
{
+
+	private static final long serialVersionUID = 1L;
+
+	public ComparableAggregationFunction(int positionToAggregate) {
+		super(positionToAggregate);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public T reduce(T value1, T value2) throws Exception {
+		if (value1 instanceof Tuple) {
+			Tuple t1 = (Tuple) value1;
+			Tuple t2 = (Tuple) value2;
+
+			compare(t1, t2);
+
+			return (T) returnTuple;
+		} else if (value1 instanceof Comparable) {
+			if (isExtremal((Comparable<Object>) value1, value2)) {
+				value2 = value1;
+			}
+		} else {
+			throw new RuntimeException("The values " + value1 +  " and "+ value2 + " cannot be compared.");
+		}
+
+		return null;
+	}
+
+	public <R> void compare(Tuple tuple1, Tuple tuple2) throws InstantiationException,
+			IllegalAccessException {
+		copyTuple(tuple2);
+
+		Comparable<R> o1 = tuple1.getField(position);
+		R o2 = tuple2.getField(position);
+
+		if (isExtremal(o1, o2)) {
+			returnTuple.setField(o1, position);
+		}
+	}
+
+	public abstract <R> boolean isExtremal(Comparable<R> o1, R o2);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0c8f1dac/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingAggregationFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingAggregationFunction.java
new file mode 100644
index 0000000..42c1053
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingAggregationFunction.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.types.TypeInformation;
+
+public abstract class StreamingAggregationFunction<T> implements ReduceFunction<T>
{
+	private static final long serialVersionUID = 1L;
+	
+	protected int position;
+	private TypeSerializer<Tuple> typeSerializer;
+	protected Tuple returnTuple;
+
+	public StreamingAggregationFunction(int pos) {
+		this.position = pos;
+	}
+
+	@SuppressWarnings("unchecked")
+	public void setType(TypeInformation<?> type) {
+		this.typeSerializer = (TypeSerializer<Tuple>) type.createSerializer();
+	}
+
+	protected void copyTuple(Tuple tuple) throws InstantiationException, IllegalAccessException
{
+		returnTuple = (Tuple) typeSerializer.createInstance();
+		typeSerializer.copy(tuple, returnTuple);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0c8f1dac/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMaxAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMaxAggregationFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMaxAggregationFunction.java
new file mode 100644
index 0000000..bae0043
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMaxAggregationFunction.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+public class StreamingMaxAggregationFunction<T> extends ComparableAggregationFunction<T>
{
+
+	private static final long serialVersionUID = 1L;
+
+	public StreamingMaxAggregationFunction(int pos) {
+		super(pos);
+	}
+
+	@Override
+	public <R> boolean isExtremal(Comparable<R> o1, R o2) {
+		return o1.compareTo(o2) > 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0c8f1dac/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMinAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMinAggregationFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMinAggregationFunction.java
new file mode 100644
index 0000000..eb349c6
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMinAggregationFunction.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+public class StreamingMinAggregationFunction<T> extends ComparableAggregationFunction<T>
{
+
+	private static final long serialVersionUID = 1L;
+
+	public StreamingMinAggregationFunction(int pos) {
+		super(pos);
+	}
+
+	@Override
+	public <R> boolean isExtremal(Comparable<R> o1, R o2) {
+		return o1.compareTo(o2) < 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0c8f1dac/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingSumAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingSumAggregationFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingSumAggregationFunction.java
new file mode 100644
index 0000000..1a043c1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingSumAggregationFunction.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+public class StreamingSumAggregationFunction<T> extends StreamingAggregationFunction<T>
{
+
+	private static final long serialVersionUID = 1L;
+
+	public StreamingSumAggregationFunction(int pos) {
+		super(pos);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public T reduce(T value1, T value2) throws Exception {
+		if (value1 instanceof Tuple) {
+			Tuple tuple1 = (Tuple) value1;
+			Tuple tuple2 = (Tuple) value2;
+
+			copyTuple(tuple2);
+			returnTuple.setField(add(tuple1.getField(position), tuple2.getField(position)), position);
+
+			return (T) returnTuple;
+		} else {
+			return (T) add(value1, value2);
+		}
+	}
+
+	private Object add(Object value1, Object value2) {
+		if (value1 instanceof Integer) {
+			return (Integer) value1 + (Integer) value2;
+		} else if (value1 instanceof Double) {
+			return (Double) value1 + (Double) value2;
+		} else if (value1 instanceof Float) {
+			return (Float) value1 + (Float) value2;
+		} else if (value1 instanceof Long) {
+			return (Long) value1 + (Long) value2;
+		} else if (value1 instanceof Short) {
+			return (short) ((Short) value1 + (Short) value2);
+		} else if (value1 instanceof Byte) {
+			return (byte) ((Byte) value1 + (Byte) value2);
+		} else {
+			throw new RuntimeException("DataStream cannot be summed because the class "
+					+ value1.getClass().getSimpleName() + " does not support the + operator.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0c8f1dac/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
new file mode 100644
index 0000000..7a502aa
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.function.aggregation.StreamingMaxAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.StreamingMinAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.StreamingSumAggregationFunction;
+import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
+import org.apache.flink.streaming.util.MockInvokable;
+import org.junit.Test;
+
+public class AggregationFunctionTest {
+
+	@Test
+	public void groupSumIntegerTest() {
+		ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<Tuple2<Integer,
Integer>>();
+
+		List<Tuple2<Integer, Integer>> expectedSumList = new ArrayList<Tuple2<Integer,
Integer>>();
+		List<Tuple2<Integer, Integer>> expectedMinList = new ArrayList<Tuple2<Integer,
Integer>>();
+		List<Tuple2<Integer, Integer>> expectedMaxList = new ArrayList<Tuple2<Integer,
Integer>>();
+		List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<Tuple2<Integer,
Integer>>();
+		List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<Tuple2<Integer,
Integer>>();
+		List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<Tuple2<Integer,
Integer>>();
+
+		int groupedSum0 = 0;
+		int groupedSum1 = 0;
+		int groupedSum2 = 0;
+
+		for (int i = 0; i < 9; i++) {
+			inputList.add(new Tuple2<Integer, Integer>(i % 3, i));
+
+			expectedSumList.add(new Tuple2<Integer, Integer>(i % 3, (i + 1) * i / 2));
+			expectedMinList.add(new Tuple2<Integer, Integer>(i % 3, 0));
+			expectedMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
+
+			int groupedSum;
+			switch (i % 3) {
+			case 0:
+				groupedSum = groupedSum0 += i;
+				break;
+			case 1:
+				groupedSum = groupedSum1 += i;
+				break;
+			default:
+				groupedSum = groupedSum2 += i;
+				break;
+			}
+
+			expectedGroupSumList.add(new Tuple2<Integer, Integer>(i % 3, groupedSum));
+			expectedGroupMinList.add(new Tuple2<Integer, Integer>(i % 3, i % 3));
+			expectedGroupMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
+		}
+
+		StreamingSumAggregationFunction<Tuple2<Integer, Integer>> sumFunction = new
StreamingSumAggregationFunction<Tuple2<Integer, Integer>>(
+				1);
+		StreamingMinAggregationFunction<Tuple2<Integer, Integer>> minFunction = new
StreamingMinAggregationFunction<Tuple2<Integer, Integer>>(
+				1);
+		StreamingMaxAggregationFunction<Tuple2<Integer, Integer>> maxFunction = new
StreamingMaxAggregationFunction<Tuple2<Integer, Integer>>(
+				1);
+
+		sumFunction.setType(TypeExtractor.getForObject(new Tuple2<Integer, Integer>(0, 0)));
+		minFunction.setType(TypeExtractor.getForObject(new Tuple2<Integer, Integer>(0, 0)));
+		maxFunction.setType(TypeExtractor.getForObject(new Tuple2<Integer, Integer>(0, 0)));
+
+		List<Tuple2<Integer, Integer>> sumList = MockInvokable.createAndExecute(
+				new StreamReduceInvokable<Tuple2<Integer, Integer>>(sumFunction), inputList);
+		List<Tuple2<Integer, Integer>> minList = MockInvokable.createAndExecute(
+				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minFunction), inputList);
+		List<Tuple2<Integer, Integer>> maxList = MockInvokable.createAndExecute(
+				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), inputList);
+
+		List<Tuple2<Integer, Integer>> groupedSumList = MockInvokable.createAndExecute(
+				new GroupReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, 0), inputList);
+		List<Tuple2<Integer, Integer>> groupedMinList = MockInvokable.createAndExecute(
+				new GroupReduceInvokable<Tuple2<Integer, Integer>>(minFunction, 0), inputList);
+		List<Tuple2<Integer, Integer>> groupedMaxList = MockInvokable.createAndExecute(
+				new GroupReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, 0), inputList);
+
+		assertEquals(expectedSumList, sumList);
+		assertEquals(expectedMinList, minList);
+		assertEquals(expectedMaxList, maxList);
+		assertEquals(expectedGroupSumList, groupedSumList);
+		assertEquals(expectedGroupMinList, groupedMinList);
+		assertEquals(expectedGroupMaxList, groupedMaxList);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0c8f1dac/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
index e554d91..f78cd1a 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
@@ -20,40 +20,29 @@ package org.apache.flink.streaming.examples.wordcount;
 import java.util.StringTokenizer;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestDataUtil;
 import org.apache.flink.util.Collector;
 
-
 // This example will count the occurrence of each word in the input file.
 public class WordCountLocal {
 
-	public static class WordCountSplitter implements FlatMapFunction<String, Tuple2<String,
Integer>> {
+	public static class WordCountSplitter implements
+			FlatMapFunction<String, Tuple2<String, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void flatMap(String inTuple, Collector<Tuple2<String, Integer>> out)
throws Exception {
+		public void flatMap(String inTuple, Collector<Tuple2<String, Integer>> out)
+				throws Exception {
 			StringTokenizer tokenizer = new StringTokenizer(inTuple);
 			while (tokenizer.hasMoreTokens()) {
 				out.collect(new Tuple2<String, Integer>(tokenizer.nextToken(), 1));
 			}
 		}
 	}
-	
-	public static class WordCountCounter implements ReduceFunction<Tuple2<String, Integer>>
{
-		private static final long serialVersionUID = 1L;
 
-		@Override
-		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
-				Tuple2<String, Integer> value2) throws Exception {
-			return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
-		}
-
-	}
-	
 	public static void main(String[] args) {
 
 		TestDataUtil.downloadIfNotExists("hamlet.txt");
@@ -61,10 +50,8 @@ public class WordCountLocal {
 
 		DataStream<Tuple2<String, Integer>> dataStream = env
 				.readTextFile("src/test/resources/testdata/hamlet.txt")
-				.flatMap(new WordCountSplitter())
-				.groupBy(0)
-				.reduce(new WordCountCounter());
-		
+				.flatMap(new WordCountSplitter()).groupBy(0).sum(1);
+
 		dataStream.print();
 
 		env.execute();


Mime
View raw message