flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [18/28] git commit: [streaming] reduce operator added to DataStream and ConnectedDataStream + grouped batch and windowReduce operators reworked
Date Fri, 29 Aug 2014 19:03:51 GMT
[streaming] reduce operator added to DataStream and ConnectedDataStream + grouped batch and windowReduce operators reworked


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/90ad6b04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/90ad6b04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/90ad6b04

Branch: refs/heads/master
Commit: 90ad6b0468724926be1d6e5d90f2987785a24778
Parents: cad38ed
Author: gyfora <gyula.fora@gmail.com>
Authored: Mon Aug 25 16:54:19 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200

----------------------------------------------------------------------
 .../api/datastream/ConnectedDataStream.java     |  26 ++
 .../streaming/api/datastream/DataStream.java    |  17 +
 .../api/datastream/GroupedDataStream.java       | 322 ++++++++++---------
 .../operator/BatchGroupReduceInvokable.java     |  41 ++-
 .../operator/BatchReduceInvokable.java          |  89 ++++-
 .../operator/GroupReduceInvokable.java          |  26 +-
 .../operator/StreamReduceInvokable.java         |  89 ++---
 .../operator/WindowGroupReduceInvokable.java    |  44 ++-
 .../operator/WindowReduceInvokable.java         |  24 +-
 .../operator/co/CoReduceInvokable.java          |  58 ++++
 .../flink/streaming/state/TableState.java       |  11 +-
 .../examples/wordcount/WordCountLocal.java      |   2 +-
 12 files changed, 416 insertions(+), 333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/90ad6b04/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 920278c..bd0c607 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -27,10 +27,12 @@ import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
 
@@ -152,6 +154,30 @@ public class ConnectedDataStream<IN1, IN2> {
 		return addCoFunction("coFlatMap", coFlatMapper, in1TypeWrapper, in2TypeWrapper,
 				outTypeWrapper, new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
 	}
+	
+	/**
+	 * Applies a CoReduce transformation on the data stream. The transformation calls
+	 * {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
+	 * each element of the first input and {@link CoReduceFunction#reduce2} and
+	 * {@link CoReduceFunction#map2} for each element of the second input. 
+	 * 
+	 * @param coReducer
+	 *            The {@link CoReduceFunction} that will be called for every two
+	 *            element of each input DataStream.
+	 * @return The transformed DataStream.
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
+
+		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
+				CoReduceFunction.class, 0);
+		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
+				CoReduceFunction.class, 1);
+		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
+				CoReduceFunction.class, 2);
+
+		return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
+				new CoReduceInvokable<IN1, IN2, OUT>(coReducer));
+	}
 
 	protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
 			final Function function, TypeSerializerWrapper<IN1> in1TypeWrapper,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/90ad6b04/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 53e302e..5d5909c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -28,10 +28,12 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.RichFilterFunction;
 import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
@@ -46,6 +48,7 @@ import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
 import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.partitioner.DistributePartitioner;
@@ -289,6 +292,20 @@ public abstract class DataStream<OUT> {
 		return addFunction("flatMap", flatMapper, inTypeWrapper,
 				outTypeWrapper, new FlatMapInvokable<OUT, R>(flatMapper));
 	}
+	
+	/**
+	 * Applies a reduce transformation on the data stream. The user can also extend the {@link RichReduceFunction} to gain access to other features provided by
+	 * the {@link RichFuntion} interface.
+	 * 
+	 * @param reducer
+	 *            The {@link ReduceFunction} that will be called for every
+	 *            element of the input values.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
+		return addFunction("reduce", reducer, new FunctionTypeWrapper<OUT>(reducer, ReduceFunction.class, 0),
+				new FunctionTypeWrapper<OUT>(reducer, ReduceFunction.class, 0), new StreamReduceInvokable<OUT>(reducer));
+	}
 
 	public GroupedDataStream<OUT> groupBy(int keyPosition) {
 		return new GroupedDataStream<OUT>(this,	keyPosition);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/90ad6b04/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index a532b18..bc2ac38 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -1,161 +1,165 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.RichReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
-
-/**
- * A GroupedDataStream represents a data stream which has been partitioned by
- * the given key in the values. Operators like {@link #reduce},
- * {@link #batchReduce} etc. can be applied on the {@link GroupedDataStream}.
- *
- * @param <OUT>
- *            The output type of the {@link GroupedDataStream}.
- */
-public class GroupedDataStream<OUT> {
-
-	DataStream<OUT> dataStream;
-	int keyPosition;
-
-	protected GroupedDataStream(DataStream<OUT> dataStream, int keyPosition) {
-		this.dataStream = dataStream.partitionBy(keyPosition);
-		this.keyPosition = keyPosition;
-	}
-
-	/**
-	 * Applies a reduce transformation on the grouped data stream grouped on by
-	 * the given key position. The {@link ReduceFunction} will receive input
-	 * values based on the key value. Only input values with the same key will
-	 * go to the same reducer.The user can also extend
-	 * {@link RichReduceFunction} to gain access to other features provided by
-	 * the {@link RichFuntion} interface.
-	 * 
-	 * @param reducer
-	 *            The {@link ReduceFunction} that will be called for every
-	 *            element of the input values with the same key.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
-		return dataStream.addFunction("groupReduce", reducer, getTypeWrapper(reducer),
-				getTypeWrapper(reducer), new GroupReduceInvokable<OUT>(reducer, keyPosition));
-	}
-
-	/**
-	 * Applies a group reduce transformation on preset chunks of the grouped
-	 * data stream. The {@link ReduceFunction} will receive input values based
-	 * on the key value. Only input values with the same key will go to the same
-	 * reducer.When the reducer has ran for all the values in the batch, the
-	 * batch is slid forward. access to other features provided by the
-	 * {@link RichFuntion} interface.
-	 * 
-	 * 
-	 * @param reducer
-	 *            The {@link ReduceFunction} that will be called for every
-	 *            element of the input values with the same key.
-	 * @param batchSize
-	 *            The size of the data stream chunk (the number of values in the
-	 *            batch).
-	 * @return The transformed {@link DataStream}.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> batchReduce(ReduceFunction<OUT> reducer, int batchSize) {
-		return batchReduce(reducer, batchSize, batchSize);
-	}
-
-	/**
-	 * Applies a group reduce transformation on preset chunks of the grouped
-	 * data stream in a sliding window fashion. The {@link ReduceFunction} will
-	 * receive input values based on the key value. Only input values with the
-	 * same key will go to the same reducer. When the reducer has ran for all
-	 * the values in the batch, the batch is slid forward. The user can also
-	 * extend {@link RichReduceFunction} to gain access to other features
-	 * provided by the {@link RichFuntion} interface.
-	 * 
-	 * @param reducer
-	 *            The {@link ReduceFunction} that will be called for every
-	 *            element of the input values with the same key.
-	 * @param batchSize
-	 *            The size of the data stream chunk (the number of values in the
-	 *            batch).
-	 * @param slideSize
-	 *            The number of values the batch is slid by.
-	 * @return The transformed {@link DataStream}.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> batchReduce(ReduceFunction<OUT> reducer,
-			long batchSize, long slideSize) {
-
-		return dataStream.addFunction("batchReduce", reducer, getTypeWrapper(reducer),
-				getTypeWrapper(reducer), new BatchGroupReduceInvokable<OUT>(reducer, batchSize,
-						slideSize, keyPosition));
-	}
-
-	/**
-	 * Applies a group reduce transformation on preset "time" chunks of the
-	 * grouped data stream. The {@link ReduceFunction} will receive input values
-	 * based on the key value. Only input values with the same key will go to
-	 * the same reducer.When the reducer has ran for all the values in the
-	 * batch, the window is shifted forward gain access to other features
-	 * provided by the {@link RichFuntion} interface.
-	 * 
-	 * 
-	 * @param reducer
-	 *            The GroupReduceFunction that is called for each time window.
-	 * @param windowSize
-	 *            SingleOutputStreamOperator The time window to run the reducer
-	 *            on, in milliseconds.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> windowReduce(ReduceFunction<OUT> reducer,
-			long windowSize) {
-		return windowReduce(reducer, windowSize, windowSize, windowSize);
-	}
-
-	/**
-	 * Applies a group reduce transformation on preset "time" chunks of the
-	 * grouped data stream in a sliding window fashion. The
-	 * {@link ReduceFunction} will receive input values based on the key value.
-	 * Only input values with the same key will go to the same reducer. When the
-	 * reducer has ran for all the values in the batch, the window is shifted
-	 * forward. The user can also extend {@link RichReduceFunction} to gain
-	 * access to other features provided by the {@link RichFuntion} interface.
-	 *
-	 * @param reducer
-	 *            The GroupReduceFunction that is called for each time window.
-	 * @param windowSize
-	 *            SingleOutputStreamOperator The time window to run the reducer
-	 *            on, in milliseconds.
-	 * @param slideSize
-	 *            The time interval the batch is slid by.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> windowReduce(ReduceFunction<OUT> reducer,
-			long windowSize, long slideInterval, long timeUnitInMillis) {
-		return dataStream.addFunction("batchReduce", reducer, getTypeWrapper(reducer),
-				getTypeWrapper(reducer), new WindowGroupReduceInvokable<OUT>(reducer, windowSize,
-						slideInterval, keyPosition));
-	}
-
-	private TypeSerializerWrapper<OUT> getTypeWrapper(ReduceFunction<OUT> reducer) {
-		return new FunctionTypeWrapper<OUT>(reducer, ReduceFunction.class, 0);
-	}
-}
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
+import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+
+/**
+ * A GroupedDataStream represents a data stream which has been partitioned by
+ * the given key in the values. Operators like {@link #reduce},
+ * {@link #batchReduce} etc. can be applied on the {@link GroupedDataStream}.
+ *
+ * @param <OUT>
+ *            The output type of the {@link GroupedDataStream}.
+ */
+public class GroupedDataStream<OUT> {
+
+	DataStream<OUT> dataStream;
+	int keyPosition;
+
+	protected GroupedDataStream(DataStream<OUT> dataStream, int keyPosition) {
+		this.dataStream = dataStream.partitionBy(keyPosition);
+		this.keyPosition = keyPosition;
+	}
+
+	/**
+	 * Applies a reduce transformation on the grouped data stream grouped on by
+	 * the given key position. The {@link ReduceFunction} will receive input
+	 * values based on the key value. Only input values with the same key will
+	 * go to the same reducer.The user can also extend
+	 * {@link RichReduceFunction} to gain access to other features provided by
+	 * the {@link RichFuntion} interface.
+	 * 
+	 * @param reducer
+	 *            The {@link ReduceFunction} that will be called for every
+	 *            element of the input values with the same key.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
+		return dataStream.addFunction("groupReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
+				ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
+				ReduceFunction.class, 0), new GroupReduceInvokable<OUT>(reducer, keyPosition));
+	}
+
+	/**
+	 * Applies a group reduce transformation on preset chunks of the grouped
+	 * data stream. The {@link GroupReduceFunction} will receive input values
+	 * based on the key value. Only input values with the same key will go to
+	 * the same reducer.When the reducer has ran for all the values in the
+	 * batch, the batch is slid forward.The user can also extend
+	 * {@link RichGroupReduceFunction} to gain access to other features provided
+	 * by the {@link RichFuntion} interface.
+	 * 
+	 * 
+	 * @param reducer
+	 *            The {@link GroupReduceFunction} that will be called for every
+	 *            element of the input values with the same key.
+	 * @param batchSize
+	 *            The size of the data stream chunk (the number of values in the
+	 *            batch).
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
+			int batchSize) {
+		return batchReduce(reducer, batchSize, batchSize);
+	}
+
+	/**
+	 * Applies a group reduce transformation on preset chunks of the grouped
+	 * data stream in a sliding window fashion. The {@link GroupReduceFunction}
+	 * will receive input values based on the key value. Only input values with
+	 * the same key will go to the same reducer. When the reducer has ran for
+	 * all the values in the batch, the batch is slid forward. The user can also
+	 * extend {@link RichGroupReduceFunction} to gain access to other features
+	 * provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param reducer
+	 *            The {@link GroupReduceFunction} that will be called for every
+	 *            element of the input values with the same key.
+	 * @param batchSize
+	 *            The size of the data stream chunk (the number of values in the
+	 *            batch).
+	 * @param slideSize
+	 *            The number of values the batch is slid by.
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
+			long batchSize, long slideSize) {
+
+		return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
+				GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
+				GroupReduceFunction.class, 1), new BatchGroupReduceInvokable<OUT, R>(reducer,
+				batchSize, slideSize, keyPosition));
+	}
+
+	/**
+	 * Applies a group reduce transformation on preset "time" chunks of the
+	 * grouped data stream. The {@link GroupReduceFunction} will receive input
+	 * values based on the key value. Only input values with the same key will
+	 * go to the same reducer.When the reducer has ran for all the values in the
+	 * batch, the window is shifted forward. The user can also extend
+	 * {@link RichGroupReduceFunction} to gain access to other features provided
+	 * by the {@link RichFuntion} interface.
+	 * 
+	 * 
+	 * @param reducer
+	 *            The GroupReduceFunction that is called for each time window.
+	 * @param windowSize
+	 *            SingleOutputStreamOperator The time window to run the reducer
+	 *            on, in milliseconds.
+	 * @return The transformed DataStream.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
+			long windowSize) {
+		return windowReduce(reducer, windowSize, windowSize, windowSize);
+	}
+
+	/**
+	 * Applies a group reduce transformation on preset "time" chunks of the
+	 * grouped data stream in a sliding window fashion. The
+	 * {@link GroupReduceFunction} will receive input values based on the key
+	 * value. Only input values with the same key will go to the same reducer.
+	 * When the reducer has ran for all the values in the batch, the window is
+	 * shifted forward. The user can also extend {@link RichGroupReduceFunction}
+	 * to gain access to other features provided by the {@link RichFuntion}
+	 * interface.
+	 *
+	 * @param reducer
+	 *            The GroupReduceFunction that is called for each time window.
+	 * @param windowSize
+	 *            SingleOutputStreamOperator The time window to run the reducer
+	 *            on, in milliseconds.
+	 * @param slideSize
+	 *            The time interval the batch is slid by.
+	 * @return The transformed DataStream.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
+			long windowSize, long slideInterval, long timeUnitInMillis) {
+		return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
+				GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
+				GroupReduceFunction.class, 1), new WindowGroupReduceInvokable<OUT, R>(reducer,
+				windowSize, slideInterval, keyPosition));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/90ad6b04/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
index 78185ed..95b3249 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
@@ -17,58 +17,55 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
-import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.state.MutableTableState;
 
-public class BatchGroupReduceInvokable<IN> extends BatchReduceInvokable<IN, IN> {
+public class BatchGroupReduceInvokable<IN, OUT> extends BatchReduceInvokable<IN, OUT> {
 
 	private static final long serialVersionUID = 1L;
 
 	int keyPosition;
-	protected ReduceFunction<IN> reducer;
+	protected GroupReduceFunction<IN, OUT> reducer;
 	private Iterator<StreamRecord<IN>> iterator;
-	private MutableTableState<Object, IN> values;
+	private MutableTableState<Object, List<IN>> values;
 
-	public BatchGroupReduceInvokable(ReduceFunction<IN> reduceFunction, long batchSize,
+	public BatchGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long batchSize,
 			long slideSize, int keyPosition) {
 		super(reduceFunction, batchSize, slideSize);
 		this.keyPosition = keyPosition;
 		this.reducer = reduceFunction;
-		values = new MutableTableState<Object, IN>();
+		values = new MutableTableState<Object, List<IN>>();
 	}
 
-	private IN reduced;
 	private IN nextValue;
-	private IN currentValue;
 
 	@Override
 	protected void reduce() {
 		iterator = state.getStreamRecordIterator();
 		while (iterator.hasNext()) {
 			StreamRecord<IN> nextRecord = iterator.next();
-
-			nextValue = nextRecord.getObject();
 			Object key = nextRecord.getField(keyPosition);
+			nextValue = nextRecord.getObject();
 
-			currentValue = values.get(key);
-			if (currentValue != null) {
-				callUserFunctionAndLogException();
-				values.put(key, reduced);
-				collector.collect(reduced);
+			List<IN> group = values.get(key);
+			if (group != null) {
+				group.add(nextValue);
 			} else {
-				values.put(key, nextValue);
-				collector.collect(nextValue);
+				group = new ArrayList<IN>();
+				group.add(nextValue);
+				values.put(key, group);
 			}
 		}
+		for (List<IN> group : values.values()) {
+			userIterable = group;
+			callUserFunctionAndLogException();
+		}
 		values.clear();
 	}
 
-	@Override
-	protected void callUserFunction() throws Exception {
-		reduced = reducer.reduce(currentValue, nextValue);
-	}
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/90ad6b04/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index a320017..df07675 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -17,16 +17,30 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
 import org.apache.commons.math.util.MathUtils;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.state.SlidingWindowState;
 
-public class BatchReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT> {
+public class BatchReduceInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
+
 	private static final long serialVersionUID = 1L;
+	protected GroupReduceFunction<IN, OUT> reducer;
+	protected BatchIterator<IN> userIterator;
+	protected Iterable<IN> userIterable;
+	protected long slideSize;
+	protected long granularity;
+	protected int listSize;
+	protected transient SlidingWindowState<IN> state;
+	
 	private long batchSize;
-	int counter = 0;
+	private int counter = 0;
 
 	public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long batchSize,
 			long slideSize) {
@@ -37,15 +51,6 @@ public class BatchReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT
 		this.granularity = MathUtils.gcd(batchSize, slideSize);
 		this.listSize = (int) granularity;
 	}
-	
-	protected BatchReduceInvokable(ReduceFunction<IN> reduceFunction, long batchSize,
-			long slideSize) {
-		super(reduceFunction);
-		this.batchSize = batchSize;
-		this.slideSize = slideSize;
-		this.granularity = MathUtils.gcd(batchSize, slideSize);
-		this.listSize = (int) granularity;
-	}
 
 	@Override
 	protected void mutableInvoke() throws Exception {
@@ -53,12 +58,38 @@ public class BatchReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT
 	}
 
 	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		this.state = new SlidingWindowState<IN>(batchSize, slideSize, granularity);
+	protected void immutableInvoke() throws Exception {
+		if ((reuse = recordIterator.next(reuse)) == null) {
+			throw new RuntimeException("DataStream must not be empty");
+		}
+	
+		while (reuse != null && !state.isFull()) {
+			collectOneUnit();
+		}
+		reduce();
+	
+		while (reuse != null) {
+			for (int i = 0; i < slideSize / granularity; i++) {
+				if (reuse != null) {
+					collectOneUnit();
+				}
+			}
+			reduce();
+		}
 	}
 
-	@Override
+	private void collectOneUnit() throws IOException {
+		ArrayList<StreamRecord<IN>> list;
+		list = new ArrayList<StreamRecord<IN>>(listSize);
+	
+		do {
+			list.add(reuse);
+			resetReuse();
+		} while ((reuse = recordIterator.next(reuse)) != null && batchNotFull());
+		state.pushBack(list);
+	}
+
+	
 	protected boolean batchNotFull() {
 		counter++;
 		if (counter < granularity) {
@@ -69,5 +100,31 @@ public class BatchReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT
 		}
 	}
 
+	protected void reduce() {
+		userIterator = state.getIterator();
+		callUserFunctionAndLogException();
+	}
+	
+	@Override
+	protected void callUserFunction() throws Exception {
+		reducer.reduce(userIterable, collector);
+	}
+	
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.state = new SlidingWindowState<IN>(batchSize, slideSize, granularity);
+		userIterable = new BatchIterable();
+	}
+
+	protected class BatchIterable implements Iterable<IN> {
+
+		@Override
+		public Iterator<IN> iterator() {
+			return userIterator;
+		}
+
+	}
+
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/90ad6b04/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
index 6f59fb0..73523d9 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
@@ -18,43 +18,23 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 import org.apache.flink.streaming.state.MutableTableState;
 
-public class GroupReduceInvokable<IN> extends UserTaskInvokable<IN, IN> {
+public class GroupReduceInvokable<IN> extends StreamReduceInvokable<IN> {
 	private static final long serialVersionUID = 1L;
 
-	private ReduceFunction<IN> reducer;
 	private int keyPosition;
 	private MutableTableState<Object, IN> values;
+	private IN reduced;
 
 	public GroupReduceInvokable(ReduceFunction<IN> reducer, int keyPosition) {
 		super(reducer);
-		this.reducer = reducer;
 		this.keyPosition = keyPosition;
 		values = new MutableTableState<Object, IN>();
 	}
 
 	@Override
-	protected void immutableInvoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
-			reduce();
-			resetReuse();
-		}
-	}
-
-	@Override
-	protected void mutableInvoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
-			reduce();
-		}
-	}
-
-	private IN reduced;
-	private IN nextValue;
-	private IN currentValue;
-	
-	private void reduce() throws Exception {
+	protected void reduce() throws Exception {
 		Object key = reuse.getField(keyPosition);
 		currentValue = values.get(key);
 		nextValue = reuse.getObject();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/90ad6b04/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index 288b2c6..52039be 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -17,88 +17,51 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.state.SlidingWindowState;
-
-public abstract class StreamReduceInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
-
-	public StreamReduceInvokable(Function userFunction) {
-		super(userFunction);
-	}
 
+public class StreamReduceInvokable<IN> extends UserTaskInvokable<IN, IN> {
 	private static final long serialVersionUID = 1L;
-	protected GroupReduceFunction<IN, OUT> reducer;
-	protected BatchIterator<IN> userIterator;
-	protected BatchIterable userIterable;
-	protected long slideSize;
-	protected long granularity;
-	protected int listSize;
-	protected transient SlidingWindowState<IN> state;
 
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		userIterable = new BatchIterable();
-		super.open(parameters);
+	protected ReduceFunction<IN> reducer;
+	protected IN currentValue;
+	protected IN nextValue;
+
+	public StreamReduceInvokable(ReduceFunction<IN> reducer) {
+		super(reducer);
+		this.reducer = reducer;
+		currentValue = null;
 	}
 
 	@Override
 	protected void immutableInvoke() throws Exception {
-		if ((reuse = recordIterator.next(reuse)) == null) {
-			throw new RuntimeException("DataStream must not be empty");
-		}
-
-		while (reuse != null && !state.isFull()) {
-			collectOneUnit();
+		while ((reuse = recordIterator.next(reuse)) != null) {
+			reduce();
+			resetReuse();
 		}
-		reduce();
+	}
 
-		while (reuse != null) {
-			for (int i = 0; i < slideSize / granularity; i++) {
-				if (reuse != null) {
-					collectOneUnit();
-				}
-			}
+	@Override
+	protected void mutableInvoke() throws Exception {
+		while ((reuse = recordIterator.next(reuse)) != null) {
 			reduce();
 		}
 	}
 
-	protected void reduce() {
-		userIterator = state.getIterator();
+	protected void reduce() throws Exception {
+		nextValue = reuse.getObject();
 		callUserFunctionAndLogException();
-	}
-	
-	@Override
-	protected void callUserFunction() throws Exception {
-		reducer.reduce(userIterable, collector);
-	}
-	
-	private void collectOneUnit() throws IOException {
-		ArrayList<StreamRecord<IN>> list;
-		list = new ArrayList<StreamRecord<IN>>(listSize);
 
-		do {
-			list.add(reuse);
-			resetReuse();
-		} while ((reuse = recordIterator.next(reuse)) != null && batchNotFull());
-		state.pushBack(list);
 	}
 
-	protected abstract boolean batchNotFull();
-
-	protected class BatchIterable implements Iterable<IN> {
-
-		@Override
-		public Iterator<IN> iterator() {
-			return userIterator;
+	@Override
+	protected void callUserFunction() throws Exception {
+		if (currentValue != null) {
+			currentValue = reducer.reduce(currentValue, nextValue);
+		} else {
+			currentValue = nextValue;
 		}
+		collector.collect(currentValue);
 
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/90ad6b04/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
index fea5620..dbba50e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
@@ -17,58 +17,54 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
-import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.state.MutableTableState;
 
-public class WindowGroupReduceInvokable<IN> extends WindowReduceInvokable<IN, IN> {
+public class WindowGroupReduceInvokable<IN, OUT> extends WindowReduceInvokable<IN, OUT> {
 
 	int keyPosition;
-	protected ReduceFunction<IN> reducer;
+	protected GroupReduceFunction<IN, OUT> reducer;
 	private Iterator<StreamRecord<IN>> iterator;
-	private MutableTableState<Object, IN> values;
+	private MutableTableState<Object, List<IN>> values;
 
-	public WindowGroupReduceInvokable(ReduceFunction<IN> reduceFunction, long windowSize,
+	public WindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
 			long slideInterval, int keyPosition) {
 		super(reduceFunction, windowSize, slideInterval);
 		this.keyPosition = keyPosition;
 		this.reducer = reduceFunction;
-		values = new MutableTableState<Object, IN>();
+		values = new MutableTableState<Object, List<IN>>();
 	}
 
-	private IN reduced;
 	private IN nextValue;
-	private IN currentValue;
-	
+
 	@Override
 	protected void reduce() {
 		iterator = state.getStreamRecordIterator();
 		while (iterator.hasNext()) {
 			StreamRecord<IN> nextRecord = iterator.next();
-
-			nextValue = nextRecord.getObject();
 			Object key = nextRecord.getField(keyPosition);
+			nextValue = nextRecord.getObject();
 
-			currentValue = values.get(key);
-			if (currentValue != null) {
-				callUserFunctionAndLogException();
-				values.put(key, reduced);
-				collector.collect(reduced);
+			List<IN> group = values.get(key);
+			if (group != null) {
+				group.add(nextValue);
 			} else {
-				values.put(key, nextValue);
-				collector.collect(nextValue);
+				group = new ArrayList<IN>();
+				group.add(nextValue);
+				values.put(key, group);
 			}
 		}
+		for (List<IN> group : values.values()) {
+			userIterable = group;
+			callUserFunctionAndLogException();
+		}
 		values.clear();
 	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		reduced = reducer.reduce(currentValue, nextValue);
-	}
-	
 	private static final long serialVersionUID = 1L;
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/90ad6b04/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index 3f43a11..48652c1 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -17,35 +17,16 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import org.apache.commons.math.util.MathUtils;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.state.SlidingWindowState;
 
-public class WindowReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT> {
+public class WindowReduceInvokable<IN, OUT> extends BatchReduceInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
-	private long windowSize;
-	volatile boolean isRunning;
 	private long startTime;
 
 	public WindowReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
 			long slideInterval) {
-		super(reduceFunction);
-		this.reducer = reduceFunction;
-		this.windowSize = windowSize;
-		this.slideSize = slideInterval;
-		this.granularity = MathUtils.gcd(windowSize, slideSize);
-		this.listSize = (int) granularity;
-	}
-	
-	public WindowReduceInvokable(ReduceFunction<IN> reduceFunction, long windowSize,
-			long slideInterval) {
-		super(reduceFunction);
-		this.windowSize = windowSize;
-		this.slideSize = slideInterval;
-		this.granularity = MathUtils.gcd(windowSize, slideSize);
-		this.listSize = (int) granularity;
+		super(reduceFunction, windowSize,slideInterval);
 	}
 
 	@Override
@@ -63,7 +44,6 @@ public class WindowReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OU
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
 		startTime = System.currentTimeMillis();
-		this.state = new SlidingWindowState<IN>(windowSize, slideSize, granularity);
 	}
 
 	protected void mutableInvoke() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/90ad6b04/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
new file mode 100755
index 0000000..c9a313f
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+
+public class CoReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private CoReduceFunction<IN1, IN2, OUT> coReducer;
+	private IN1 currentValue1 = null;
+	private IN2 currentValue2 = null;
+
+	public CoReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer) {
+		super(coReducer);
+		this.coReducer = coReducer;
+	}
+
+	@Override
+	public void handleStream1() throws Exception {
+		IN1 nextValue = reuse1.getObject();
+		if (currentValue1 != null) {
+			currentValue1 = coReducer.reduce1(currentValue1, nextValue);
+			collector.collect(coReducer.map1(currentValue1));
+		} else {
+			currentValue1 = nextValue;
+			collector.collect(coReducer.map1(nextValue));
+		}
+	}
+
+	@Override
+	public void handleStream2() throws Exception {
+		IN2 nextValue = reuse2.getObject();
+		if (currentValue2 != null) {
+			currentValue2 = coReducer.reduce2(currentValue2, nextValue);
+			collector.collect(coReducer.map2(currentValue2));
+		} else {
+			currentValue2 = nextValue;
+			collector.collect(coReducer.map2(nextValue));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/90ad6b04/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/TableState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/TableState.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/TableState.java
index 38cd469..4fe9817 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/TableState.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/TableState.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.state;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -27,7 +28,7 @@ import java.util.Map;
 @SuppressWarnings("serial")
 public class TableState<K, V> implements Serializable {
 
-	protected Map<K, V> state=new LinkedHashMap<K, V>();
+	protected Map<K, V> state = new LinkedHashMap<K, V>();
 
 	public void put(K key, V value) {
 		state.put(key, value);
@@ -48,8 +49,12 @@ public class TableState<K, V> implements Serializable {
 	public TableStateIterator<K, V> getIterator() {
 		return new TableStateIterator<K, V>(state.entrySet().iterator());
 	}
-	
-	public void clear(){
+
+	public Collection<V> values() {
+		return state.values();
+	}
+
+	public void clear() {
 		state.clear();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/90ad6b04/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
index cea90b0..e554d91 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
@@ -57,7 +57,7 @@ public class WordCountLocal {
 	public static void main(String[] args) {
 
 		TestDataUtil.downloadIfNotExists("hamlet.txt");
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		DataStream<Tuple2<String, Integer>> dataStream = env
 				.readTextFile("src/test/resources/testdata/hamlet.txt")


Mime
View raw message