flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [14/18] git commit: [streaming] GroupedBatchReduce modified to batch by key, updated WindowReduce + Batch/Window reduce refactor
Date Sat, 20 Sep 2014 13:10:57 GMT
[streaming] GroupedBatchReduce modified to batch by key, updated WindowReduce + Batch/Window reduce refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6b6951ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6b6951ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6b6951ef

Branch: refs/heads/master
Commit: 6b6951eff685669e2e444fdedf9251fb71bba423
Parents: 48d8ed7
Author: Gyula Fora <gyfora@apache.org>
Authored: Tue Sep 16 17:55:08 2014 +0200
Committer: mbalassi <balassi.marton@gmail.com>
Committed: Sat Sep 20 13:44:12 2014 +0200

----------------------------------------------------------------------
 docs/streaming_guide.md                         |   4 +-
 .../api/datastream/BatchedDataStream.java       |   3 +-
 .../api/datastream/GroupedDataStream.java       | 135 +------
 .../api/datastream/WindowDataStream.java        |   2 +-
 .../api/invokable/StreamInvokable.java          |   3 +
 .../operator/BatchReduceInvokable.java          | 169 ++++++++-
 .../operator/GroupedBatchReduceInvokable.java   |  69 +---
 .../operator/GroupedWindowReduceInvokable.java  |  99 +++--
 .../operator/WindowReduceInvokable.java         | 123 ++++---
 .../streaming/state/NullableCircularBuffer.java | 362 +++++++++++++++++++
 .../api/invokable/operator/BatchReduceTest.java |   9 +-
 .../operator/GroupedBatchReduceTest.java        |  35 +-
 .../operator/WindowReduceInvokableTest.java     |  40 +-
 13 files changed, 733 insertions(+), 320 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6b6951ef/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 6ed53df..c1c6cde 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -261,8 +261,10 @@ For every incoming tuple the selected field is replaced with the current aggrega
 
 Window and batch operators allow the user to execute function on slices or windows of the DataStream in a sliding fashion. If the stepsize for the slide is not defined then the window/batchsize is used as stepsize by default.
 
-When applied to grouped data streams the operators applied will be executed on groups of elements grouped by the selected key position.
+When applied to grouped data streams the data stream will be batched/windowed for different key values separately. 
 
+For example a `ds.groupBy(0).batch(100, 10)` will produce batches of the last 100 elements for each key value with 10 record step size.
+ 
 #### Reduce on windowed/batched data streams
 The transformation calls a user-defined `ReduceFunction` on records received in the batch or during the predefined time window. The window is shifted after each reduce call. The user can also use the different streaming aggregations.
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6b6951ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
index bcedac9..0249a1f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
 import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable;
@@ -199,7 +200,7 @@ public class BatchedDataStream<OUT> {
 	}
 
 	private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
-		BatchReduceInvokable<OUT> invokable = getReduceInvokable(aggregate);
+		StreamOperatorInvokable<OUT, OUT> invokable = getReduceInvokable(aggregate);
 
 		SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("batchReduce",
 				aggregate, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6b6951ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index e30d316..8978b19 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -17,16 +17,10 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.GroupedWindowGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 
@@ -73,132 +67,6 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	}
 
 	/**
-	 * Applies a group reduce transformation on preset chunks of the grouped
-	 * data stream. The {@link GroupReduceFunction} will receive input values
-	 * based on the key value. Only input values with the same key will go to
-	 * the same reducer.When the reducer has ran for all the values in the
-	 * batch, the batch is slid forward.The user can also extend
-	 * {@link RichGroupReduceFunction} to gain access to other features provided
-	 * by the {@link RichFuntion} interface.
-	 * 
-	 * 
-	 * @param reducer
-	 *            The {@link GroupReduceFunction} that will be called for every
-	 *            element of the input values with the same key.
-	 * @param batchSize
-	 *            The size of the data stream chunk (the number of values in the
-	 *            batch).
-	 * @return The transformed {@link DataStream}.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
-			int batchSize) {
-		return batchReduce(reducer, batchSize, batchSize);
-	}
-
-	/**
-	 * Applies a group reduce transformation on preset chunks of the grouped
-	 * data stream in a sliding window fashion. The {@link GroupReduceFunction}
-	 * will receive input values based on the key value. Only input values with
-	 * the same key will go to the same reducer. When the reducer has ran for
-	 * all the values in the batch, the batch is slid forward. The user can also
-	 * extend {@link RichGroupReduceFunction} to gain access to other features
-	 * provided by the {@link RichFuntion} interface.
-	 * 
-	 * @param reducer
-	 *            The {@link GroupReduceFunction} that will be called for every
-	 *            element of the input values with the same key.
-	 * @param batchSize
-	 *            The size of the data stream chunk (the number of values in the
-	 *            batch).
-	 * @param slideSize
-	 *            The number of values the batch is slid by.
-	 * @return The transformed {@link DataStream}.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
-			long batchSize, long slideSize) {
-
-		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
-				GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
-				GroupReduceFunction.class, 1), new GroupedBatchGroupReduceInvokable<OUT, R>(reducer,
-				batchSize, slideSize, keyPosition));
-	}
-
-	/**
-	 * Applies a group reduce transformation on preset "time" chunks of the
-	 * grouped data stream. The {@link GroupReduceFunction} will receive input
-	 * values based on the key value. Only input values with the same key will
-	 * go to the same reducer.When the reducer has ran for all the values in the
-	 * batch, the window is shifted forward. The user can also extend
-	 * {@link RichGroupReduceFunction} to gain access to other features provided
-	 * by the {@link RichFuntion} interface.
-	 * 
-	 * 
-	 * @param reducer
-	 *            The GroupReduceFunction that is called for each time window.
-	 * @param windowSize
-	 *            SingleOutputStreamOperator The time window to run the reducer
-	 *            on, in milliseconds.
-	 * @return The transformed DataStream.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
-			long windowSize) {
-		return windowReduce(reducer, windowSize, windowSize);
-	}
-
-	/**
-	 * Applies a group reduce transformation on preset "time" chunks of the
-	 * grouped data stream in a sliding window fashion. The
-	 * {@link GroupReduceFunction} will receive input values based on the key
-	 * value. Only input values with the same key will go to the same reducer.
-	 * When the reducer has ran for all the values in the batch, the window is
-	 * shifted forward. The user can also extend {@link RichGroupReduceFunction}
-	 * to gain access to other features provided by the {@link RichFuntion}
-	 * interface.
-	 *
-	 * @param reducer
-	 *            The GroupReduceFunction that is called for each time window.
-	 * @param windowSize
-	 *            SingleOutputStreamOperator The time window to run the reducer
-	 *            on, in milliseconds.
-	 * @param slideInterval
-	 *            The time interval the batch is slid by.
-	 * @return The transformed DataStream.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
-			long windowSize, long slideInterval) {
-		return windowReduce(reducer, windowSize, slideInterval, new DefaultTimeStamp<OUT>());
-	}
-
-	/**
-	 * Applies a group reduce transformation on preset "time" chunks of the
-	 * grouped data stream in a sliding window fashion. The
-	 * {@link GroupReduceFunction} will receive input values based on the key
-	 * value. Only input values with the same key will go to the same reducer.
-	 * When the reducer has ran for all the values in the batch, the window is
-	 * shifted forward. The time is determined by a user-defined timestamp. The
-	 * user can also extend {@link RichGroupReduceFunction} to gain access to
-	 * other features provided by the {@link RichFuntion} interface.
-	 *
-	 * @param reducer
-	 *            The GroupReduceFunction that is called for each time window.
-	 * @param windowSize
-	 *            SingleOutputStreamOperator The time window to run the reducer
-	 *            on, in milliseconds.
-	 * @param slideInterval
-	 *            The time interval the batch is slid by.
-	 * @param timestamp
-	 *            Timestamp function to retrieve a timestamp from an element.
-	 * @return The transformed DataStream.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
-			long windowSize, long slideInterval, TimeStamp<OUT> timestamp) {
-		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
-				GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
-				GroupReduceFunction.class, 1), new GroupedWindowGroupReduceInvokable<OUT, R>(reducer,
-				windowSize, slideInterval, keyPosition, timestamp));
-	}
-
-	/**
 	 * Applies an aggregation that sums the grouped data stream at the given
 	 * position, grouped by the given key position. Input values with the same
 	 * key will be summed.
@@ -240,7 +108,8 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	@Override
 	protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
 
-		GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(aggregate, keyPosition);
+		GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(aggregate,
+				keyPosition);
 
 		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", aggregate,
 				outTypeWrapper, outTypeWrapper, invokable);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6b6951ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
index 4756050..7a53d1a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
@@ -57,7 +57,7 @@ public class WindowDataStream<OUT> extends BatchedDataStream<OUT> {
 	}
 
 	protected BatchReduceInvokable<OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
-		BatchReduceInvokable<OUT> invokable;
+		WindowReduceInvokable<OUT> invokable;
 		if (isGrouped) {
 			invokable = new GroupedWindowReduceInvokable<OUT>(reducer, batchSize, slideSize,
 					keyPosition, timeStamp);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6b6951ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index 9a6f2cc..342cc7a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -37,6 +37,7 @@ public abstract class StreamInvokable<OUT> implements Serializable {
 
 	protected Collector<OUT> collector;
 	protected Function userFunction;
+	protected volatile boolean isRunning;
 
 	public StreamInvokable(Function userFunction) {
 		this.userFunction = userFunction;
@@ -54,6 +55,7 @@ public abstract class StreamInvokable<OUT> implements Serializable {
 	 *            The configuration parameters for the operator
 	 */
 	public void open(Configuration parameters) throws Exception {
+		isRunning=true;
 		if (userFunction instanceof RichFunction) {
 			((RichFunction) userFunction).open(parameters);
 		}
@@ -65,6 +67,7 @@ public abstract class StreamInvokable<OUT> implements Serializable {
 	 * 
 	 */
 	public void close() throws Exception {
+		isRunning = false;
 		if (userFunction instanceof RichFunction) {
 			((RichFunction) userFunction).close();
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6b6951ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index cfca1ab..d713f6d 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -17,64 +17,193 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
+import java.io.Serializable;
 import java.util.Iterator;
 
+import org.apache.commons.math.util.MathUtils;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.state.NullableCircularBuffer;
 
-public class BatchReduceInvokable<OUT> extends BatchGroupReduceInvokable<OUT, OUT> {
+public class BatchReduceInvokable<OUT> extends StreamOperatorInvokable<OUT, OUT> {
 
 	private static final long serialVersionUID = 1L;
 	protected ReduceFunction<OUT> reducer;
 	protected TypeSerializer<OUT> typeSerializer;
-	protected OUT reduceReuse;
+
+	protected long slideSize;
+
+	protected long batchSize;
+	protected int granularity;
+	protected long batchPerSlide;
+	protected long numberOfBatches;
+	protected StreamBatch batch;
+	protected StreamBatch currentBatch;
 
 	public BatchReduceInvokable(ReduceFunction<OUT> reduceFunction, long batchSize, long slideSize) {
-		super(null, batchSize, slideSize);
+		super(reduceFunction);
 		this.reducer = reduceFunction;
+		this.batchSize = batchSize;
+		this.slideSize = slideSize;
+		this.granularity = (int) MathUtils.gcd(batchSize, slideSize);
+		this.batchPerSlide = slideSize / granularity;
+		this.numberOfBatches = batchSize / granularity;
+		this.batch = new StreamBatch();
 	}
 
-	protected void collectOneUnit() throws Exception {
-		OUT reduced = null;
-		if (batchNotFull()) {
-			reduced = reuse.getObject();
+	@Override
+	protected void immutableInvoke() throws Exception {
+		if ((reuse = recordIterator.next(reuse)) == null) {
+			throw new RuntimeException("DataStream must not be empty");
+
+		}
+
+		while (reuse != null) {		
+			StreamBatch batch = getBatch(reuse);
+
+			batch.reduceToBuffer(reuse);
+
 			resetReuse();
-			while (getNextRecord() != null && batchNotFull()) {
-				reduced = reducer.reduce(reduced, reuse.getObject());
-				resetReuse();
-			}
+			reuse = recordIterator.next(reuse);
 		}
-		state.pushBack(reduced);
+		
+		reduceLastBatch();
+
+	}
+
+	protected void reduceLastBatch() throws Exception {
+		batch.reduceLastBatch();		
+	}
+
+	protected StreamBatch getBatch(StreamRecord<OUT> next) {
+		return batch;
 	}
 
 	@Override
-	protected void reduce() {
+	// TODO: implement mutableInvoke for reduce
+	protected void mutableInvoke() throws Exception {
+		System.out.println("Immutable setting is used");
+		immutableInvoke();
+	}
+
+	protected void reduce(StreamBatch batch) {
+		this.currentBatch = batch;
 		callUserFunctionAndLogException();
 	}
 
 	@Override
 	protected void callUserFunction() throws Exception {
-		Iterator<OUT> reducedIterator = state.getBufferIterator();
-		OUT reduced;
-		do {
+		Iterator<OUT> reducedIterator = currentBatch.getIterator();
+		OUT reduced = null;
+
+		while (reducedIterator.hasNext() && reduced == null) {
 			reduced = reducedIterator.next();
-		} while (reducedIterator.hasNext() && reduced == null);
+		}
 
 		while (reducedIterator.hasNext()) {
 			OUT next = reducedIterator.next();
 			if (next != null) {
-				next = typeSerializer.copy(next, reduceReuse);
 				reduced = reducer.reduce(reduced, next);
 			}
 		}
-		collector.collect(reduced);
+		if (reduced != null) {
+			collector.collect(reduced);
+		}
 	}
 
 	@Override
 	public void open(Configuration config) throws Exception {
 		super.open(config);
 		this.typeSerializer = serializer.getObjectSerializer();
-		this.reduceReuse = typeSerializer.createInstance();
 	}
+
+	protected class StreamBatch implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+		protected long counter;
+		protected long minibatchCounter;
+		protected OUT currentValue;
+
+		protected NullableCircularBuffer circularBuffer;
+
+		public StreamBatch() {
+
+			this.circularBuffer = new NullableCircularBuffer((int) (batchSize / granularity));
+			this.counter = 0;
+			this.minibatchCounter = 0;
+
+		}
+
+		public void reduceToBuffer(StreamRecord<OUT> next) throws Exception {
+			OUT nextValue = next.getObject();
+			if (currentValue != null) {
+				currentValue = reducer.reduce(currentValue, nextValue);
+			} else {
+				currentValue = nextValue;
+			}
+
+			counter++;
+
+			if (miniBatchEnd()) {
+				addToBuffer();
+				if (batchEnd()) {
+					reduceBatch();
+				}
+			}
+
+		}
+
+		protected void addToBuffer() {
+			circularBuffer.add(currentValue);
+			minibatchCounter++;
+			currentValue = null;
+		}
+
+		protected boolean miniBatchEnd() {
+			return (counter % granularity) == 0;
+		}
+
+		public boolean batchEnd() {
+			if (counter == batchSize) {
+				counter -= slideSize;
+				minibatchCounter -= batchPerSlide;
+				return true;
+			}
+			return false;
+		}
+
+		public void reduceLastBatch() throws Exception {
+			if (miniBatchInProgress()) {
+				addToBuffer();
+			}
+
+			if (minibatchCounter >= 0) {
+				for (long i = 0; i < (numberOfBatches - minibatchCounter); i++) {
+					circularBuffer.remove();
+				}
+				if (!circularBuffer.isEmpty()) {
+					reduce(this);
+				}
+			}
+
+		}
+		
+		public boolean miniBatchInProgress(){
+			return currentValue != null;
+		}
+
+		public void reduceBatch() {
+			reduce(this);
+		}
+
+		@SuppressWarnings("unchecked")
+		public Iterator<OUT> getIterator() {
+			return circularBuffer.iterator();
+		}
+
+	}
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6b6951ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java
index c173932..f842073 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java
@@ -18,77 +18,40 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.state.SlidingWindowState;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
 public class GroupedBatchReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
 
 	private static final long serialVersionUID = 1L;
-	protected transient SlidingWindowState<Map<Object, OUT>> intermediateValues;
-
-	private int keyPosition;
+	int keyPosition;
+	Map<Object, StreamBatch> streamBatches;
 
 	public GroupedBatchReduceInvokable(ReduceFunction<OUT> reduceFunction, long batchSize,
 			long slideSize, int keyPosition) {
 		super(reduceFunction, batchSize, slideSize);
 		this.keyPosition = keyPosition;
-	}
-
-	protected void collectOneUnit() throws Exception {
-		Map<Object, OUT> values = new HashMap<Object, OUT>();
-		if (batchNotFull()) {
-			do {
-				Object key = reuse.getField(keyPosition);
-				OUT nextValue = reuse.getObject();
-				OUT currentValue = values.get(key);
-				if (currentValue == null) {
-					values.put(key, nextValue);
-				} else {
-					values.put(key, reducer.reduce(currentValue, nextValue));
-				}
-				resetReuse();
-			} while (getNextRecord() != null && batchNotFull());
-		}
-		intermediateValues.pushBack(values);
+		this.streamBatches = new HashMap<Object, StreamBatch>();
 	}
 
 	@Override
-	protected boolean isStateFull() {
-		return intermediateValues.isFull();
+	protected void reduceLastBatch() throws Exception {
+		for(StreamBatch batch: streamBatches.values()){
+			batch.reduceLastBatch();
+		}		
 	}
 
 	@Override
-	protected void callUserFunction() throws Exception {
-		Iterator<Map<Object, OUT>> reducedIterator = intermediateValues.getBufferIterator();
-		Map<Object, OUT> reducedValues = reducedIterator.next();
-
-		while (reducedIterator.hasNext()) {
-			Map<Object, OUT> nextValues = reducedIterator.next();
-			for (Entry<Object, OUT> entry : nextValues.entrySet()) {
-				OUT currentValue = reducedValues.get(entry.getKey());
-				if (currentValue == null) {
-					reducedValues.put(entry.getKey(), entry.getValue());
-				} else {
-					OUT next = typeSerializer.copy(entry.getValue(), reduceReuse);
-					reducedValues.put(entry.getKey(), reducer.reduce(currentValue, next));
-				}
-			}
+	protected StreamBatch getBatch(StreamRecord<OUT> next) {
+		Object key = next.getField(keyPosition);
+		StreamBatch batch = streamBatches.get(key);
+		if(batch == null){
+			batch=new StreamBatch();
+			streamBatches.put(key, batch);
 		}
-		for (OUT value : reducedValues.values()) {
-			collector.collect(value);
-		}
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		this.intermediateValues = new SlidingWindowState<Map<Object, OUT>>(batchSize, slideSize,
-				granularity);
+		return batch;
 	}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6b6951ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
index df94843..1214347 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
@@ -17,44 +17,99 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
-public class GroupedWindowReduceInvokable<OUT> extends GroupedBatchReduceInvokable<OUT> {
+public class GroupedWindowReduceInvokable<OUT> extends WindowReduceInvokable<OUT> {
 
 	private static final long serialVersionUID = 1L;
-
-	private TimeStamp<OUT> timestamp;
-	private long startTime;
-	private long nextRecordTime;
+	private int keyPosition;
 
 	public GroupedWindowReduceInvokable(ReduceFunction<OUT> reduceFunction, long windowSize,
 			long slideInterval, int keyPosition, TimeStamp<OUT> timestamp) {
-		super(reduceFunction, windowSize, slideInterval, keyPosition);
-		this.timestamp = timestamp;
-		this.startTime = timestamp.getStartTime();
+		super(reduceFunction, windowSize, slideInterval, timestamp);
+		this.keyPosition = keyPosition;
+		this.window = new GroupedStreamWindow();
+		this.batch = this.window;
 	}
-
+	
 	@Override
-	protected StreamRecord<OUT> getNextRecord() throws IOException {
-		reuse = recordIterator.next(reuse);
-		if (reuse != null) {
-			nextRecordTime = timestamp.getTimestamp(reuse.getObject());
+	protected void callUserFunction() throws Exception {	
+		@SuppressWarnings("unchecked")
+		Iterator<Map<Object, OUT>> reducedIterator = (Iterator<Map<Object, OUT>>) batch.getIterator();
+		Map<Object, OUT> reducedValues = reducedIterator.next();
+
+		while (reducedIterator.hasNext()) {
+			Map<Object, OUT> nextValues = reducedIterator.next();
+			for (Entry<Object, OUT> entry : nextValues.entrySet()) {
+				OUT currentValue = reducedValues.get(entry.getKey());
+				if (currentValue == null) {
+					reducedValues.put(entry.getKey(), entry.getValue());
+				} else {
+					reducedValues.put(entry.getKey(), reducer.reduce(currentValue, entry.getValue()));
+				}
+			}
+		}
+		for (OUT value : reducedValues.values()) {
+			collector.collect(value);
 		}
-		return reuse;
 	}
+	
 
-	@Override
-	protected boolean batchNotFull() {
-		if (nextRecordTime < startTime + granularity) {
-			return true;
-		} else {
-			startTime += granularity;
-			return false;
+	protected class GroupedStreamWindow extends StreamWindow {
+
+		private static final long serialVersionUID = 1L;
+		private Map<Object, OUT> currentValues;
+
+		public GroupedStreamWindow() {
+			super();
+			this.currentValues  = new HashMap<Object, OUT>();
 		}
+
+		@Override
+		public void reduceToBuffer(StreamRecord<OUT> next) throws Exception {
+
+			OUT nextValue = next.getObject();
+			Object key = next.getField(keyPosition);
+			checkBatchEnd(timestamp.getTimestamp(nextValue));
+
+			OUT currentValue = currentValues.get(key);
+			if (currentValue != null) {
+				currentValues.put(key, reducer.reduce(currentValue, nextValue));
+			}else{
+				currentValues.put(key, nextValue);
+			}
+
+		}
+		
+		@Override
+		public boolean miniBatchInProgress() {
+			return !currentValues.isEmpty();
+		};
+
+		@SuppressWarnings("unchecked")
+		@Override
+		protected void addToBuffer() {
+			Map<Object, OUT> reuseMap;
+			
+			if (circularBuffer.isFull()) {
+				reuseMap = (Map<Object, OUT>) circularBuffer.remove();
+				reuseMap.clear();
+			} else {
+				reuseMap = new HashMap<Object, OUT>(currentValues.size());
+			}
+			
+			circularBuffer.add(currentValues);
+			minibatchCounter++;
+			currentValues = reuseMap;
+		}
+
 	}
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6b6951ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index bd51c65..b653f95 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -17,88 +17,105 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import java.io.IOException;
-import java.util.Iterator;
-
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
 public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
 	private static final long serialVersionUID = 1L;
-	private long startTime;
-	private long nextRecordTime;
-	private TimeStamp<OUT> timestamp;
-	private String nullElement = "nullElement";
+	protected long startTime;
+	protected long nextRecordTime;
+	protected TimeStamp<OUT> timestamp;
+	protected StreamWindow window;
 
 	public WindowReduceInvokable(ReduceFunction<OUT> reduceFunction, long windowSize,
 			long slideInterval, TimeStamp<OUT> timestamp) {
 		super(reduceFunction, windowSize, slideInterval);
 		this.timestamp = timestamp;
 		this.startTime = timestamp.getStartTime();
+		this.window = new StreamWindow();
+		this.batch = this.window;
 	}
 
-	protected StreamRecord<OUT> getNextRecord() throws IOException {
-		reuse = recordIterator.next(reuse);
-		if (reuse != null) {
-			nextRecordTime = timestamp.getTimestamp(reuse.getObject());
+	protected class StreamWindow extends StreamBatch {
+
+		private static final long serialVersionUID = 1L;
+
+		public StreamWindow() {
+			super();
+
 		}
-		return reuse;
-	}
 
-	@Override
-	protected boolean batchNotFull() {
-		if (nextRecordTime < startTime + granularity) {
-			return true;
-		} else {
-			startTime += granularity;
-			return false;
+		@Override
+		public void reduceToBuffer(StreamRecord<OUT> next) throws Exception {
+			OUT nextValue = next.getObject();
+			
+			checkBatchEnd(timestamp.getTimestamp(nextValue));
+			
+			if (currentValue != null) {
+				currentValue = reducer.reduce(currentValue, nextValue);
+			} else {
+				currentValue = nextValue;
+			}
 		}
-	}
 
-	@Override
-	protected void collectOneUnit() throws Exception {
-		OUT reduced = null;
-		if (batchNotFull()) {
-			reduced = reuse.getObject();
-			resetReuse();
-			while (getNextRecord() != null && batchNotFull()) {
-				reduced = reducer.reduce(reduced, reuse.getObject());
-				resetReuse();
+		protected synchronized void checkBatchEnd(long timeStamp) {
+			nextRecordTime = timeStamp;
+
+			while (miniBatchEnd()) {
+				addToBuffer();
+				if (batchEnd()) {
+					reduceBatch();
+				}
 			}
 		}
-		if (reduced != null) {
-			state.pushBack(reduced);
-		} else {
-			state.pushBack(nullElement);
+
+		@Override
+		protected boolean miniBatchEnd() {
+			if (nextRecordTime < startTime + granularity) {
+				return false;
+			} else {
+				startTime += granularity;
+				return true;
+			}
+		}
+
+		@Override
+		public boolean batchEnd() {
+			if (minibatchCounter == numberOfBatches) {
+				minibatchCounter -= batchPerSlide;
+				return true;
+			}
+			return false;
 		}
+
 	}
 
 	@Override
-	protected void callUserFunction() throws Exception {
-		Iterator<OUT> reducedIterator = state.getBufferIterator();
-		OUT reduced = null;
-		do {
-			OUT next = reducedIterator.next();
-			if (next != nullElement) {
-				reduced = next;
-			}
-		} while (reducedIterator.hasNext() && reduced == null);
+	public void open(Configuration config) throws Exception {
+		super.open(config);
+		if (timestamp instanceof DefaultTimeStamp) {
+			(new TimeCheck()).start();
+		}
+	}
 
-		while (reducedIterator.hasNext()) {
-			OUT next = reducedIterator.next();
-			if (next != null) {
+	private class TimeCheck extends Thread {
+		@Override
+		public void run() {
+			while (true) {
 				try {
-					next = typeSerializer.copy(next, reduceReuse);
-					reduced = reducer.reduce(reduced, next);
-				} catch (ClassCastException e) {
-					// nullElement in buffer
+					Thread.sleep(slideSize);
+				} catch (InterruptedException e) {
+				}
+				if (isRunning) {
+					window.checkBatchEnd(System.currentTimeMillis());
+				} else {
+					break;
 				}
 			}
 		}
-		if (reduced != null) {
-			collector.collect(reduced);
-		}
 	}
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6b6951ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/NullableCircularBuffer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/NullableCircularBuffer.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/NullableCircularBuffer.java
new file mode 100644
index 0000000..d8e0116
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/NullableCircularBuffer.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.state;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.commons.collections.BoundedCollection;
+import org.apache.commons.collections.Buffer;
+import org.apache.commons.collections.BufferUnderflowException;
+
+@SuppressWarnings("rawtypes")
+public class NullableCircularBuffer extends AbstractCollection implements Buffer,
+		BoundedCollection, Serializable {
+
+	/** Serialization version */
+	private static final long serialVersionUID = 5603722811189451017L;
+
+	/** Underlying storage array */
+	private transient Object[] elements;
+
+	/** Array index of first (oldest) buffer element */
+	private transient int start = 0;
+
+	/**
+	 * Index mod maxElements of the array position following the last buffer
+	 * element. Buffer elements start at elements[start] and "wrap around"
+	 * elements[maxElements-1], ending at elements[decrement(end)]. For example,
+	 * elements = {c,a,b}, start=1, end=1 corresponds to the buffer [a,b,c].
+	 */
+	private transient int end = 0;
+
+	/** Flag to indicate if the buffer is currently full. */
+	private transient boolean full = false;
+
+	/** Capacity of the buffer */
+	private final int maxElements;
+
+	/**
+	 * Constructs a new <code>BoundedFifoBuffer</code> big enough to hold 32
+	 * elements.
+	 */
+	public NullableCircularBuffer() {
+		this(32);
+	}
+
+	/**
+	 * Constructs a new <code>BoundedFifoBuffer</code> big enough to hold the
+	 * specified number of elements.
+	 *
+	 * @param size
+	 *            the maximum number of elements for this fifo
+	 * @throws IllegalArgumentException
+	 *             if the size is less than 1
+	 */
+	public NullableCircularBuffer(int size) {
+		if (size <= 0) {
+			throw new IllegalArgumentException("The size must be greater than 0");
+		}
+		elements = new Object[size];
+		maxElements = elements.length;
+	}
+
+	/**
+	 * Constructs a new <code>BoundedFifoBuffer</code> big enough to hold all of
+	 * the elements in the specified collection. That collection's elements will
+	 * also be added to the buffer.
+	 *
+	 * @param coll
+	 *            the collection whose elements to add, may not be null
+	 * @throws NullPointerException
+	 *             if the collection is null
+	 */
+	@SuppressWarnings("unchecked")
+	public NullableCircularBuffer(Collection coll) {
+		this(coll.size());
+		addAll(coll);
+	}
+
+	// -----------------------------------------------------------------------
+	/**
+	 * Write the buffer out using a custom routine.
+	 * 
+	 * @param out
+	 *            the output stream
+	 * @throws IOException
+	 */
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.defaultWriteObject();
+		out.writeInt(size());
+		for (Iterator it = iterator(); it.hasNext();) {
+			out.writeObject(it.next());
+		}
+	}
+
+	/**
+	 * Read the buffer in using a custom routine.
+	 * 
+	 * @param in
+	 *            the input stream
+	 * @throws IOException
+	 * @throws ClassNotFoundException
+	 */
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		elements = new Object[maxElements];
+		int size = in.readInt();
+		for (int i = 0; i < size; i++) {
+			elements[i] = in.readObject();
+		}
+		start = 0;
+		full = (size == maxElements);
+		if (full) {
+			end = 0;
+		} else {
+			end = size;
+		}
+	}
+
+	// -----------------------------------------------------------------------
+	/**
+	 * Returns the number of elements stored in the buffer.
+	 *
+	 * @return this buffer's size
+	 */
+	public int size() {
+		int size = 0;
+
+		if (end < start) {
+			size = maxElements - start + end;
+		} else if (end == start) {
+			size = (full ? maxElements : 0);
+		} else {
+			size = end - start;
+		}
+
+		return size;
+	}
+
+	/**
+	 * Returns true if this buffer is empty; false otherwise.
+	 *
+	 * @return true if this buffer is empty
+	 */
+	public boolean isEmpty() {
+		return size() == 0;
+	}
+
+	/**
+	 * Returns true if this collection is full and no new elements can be added.
+	 *
+	 * @return <code>true</code> if the collection is full
+	 */
+	public boolean isFull() {
+		return size() == maxElements;
+	}
+
+	/**
+	 * Gets the maximum size of the collection (the bound).
+	 *
+	 * @return the maximum number of elements the collection can hold
+	 */
+	public int maxSize() {
+		return maxElements;
+	}
+
+	/**
+	 * Clears this buffer.
+	 */
+	public void clear() {
+		full = false;
+		start = 0;
+		end = 0;
+		Arrays.fill(elements, null);
+	}
+
+	/**
+	 * Adds the given element to this buffer.
+	 *
+	 * @param element
+	 *            the element to add
+	 * @return true, always
+	 */
+	public boolean add(Object element) {
+
+		if (isFull()) {
+			remove();
+		}
+
+		elements[end++] = element;
+
+		if (end >= maxElements) {
+			end = 0;
+		}
+
+		if (end == start) {
+			full = true;
+		}
+
+		return true;
+	}
+
+	/**
+	 * Returns the least recently inserted element in this buffer.
+	 *
+	 * @return the least recently inserted element
+	 * @throws BufferUnderflowException
+	 *             if the buffer is empty
+	 */
+	public Object get() {
+		if (isEmpty()) {
+			throw new BufferUnderflowException("The buffer is already empty");
+		}
+
+		return elements[start];
+	}
+
+	/**
+	 * Removes the least recently inserted element from this buffer.
+	 *
+	 * @return the least recently inserted element
+	 * @throws BufferUnderflowException
+	 *             if the buffer is empty
+	 */
+	public Object remove() {
+		if (isEmpty()) {
+			throw new BufferUnderflowException("The buffer is already empty");
+		}
+
+		Object element = elements[start];
+
+		elements[start++] = null;
+
+		if (start >= maxElements) {
+			start = 0;
+		}
+
+		full = false;
+
+		return element;
+	}
+
+	/**
+	 * Increments the internal index.
+	 * 
+	 * @param index
+	 *            the index to increment
+	 * @return the updated index
+	 */
+	private int increment(int index) {
+		index++;
+		if (index >= maxElements) {
+			index = 0;
+		}
+		return index;
+	}
+
+	/**
+	 * Decrements the internal index.
+	 * 
+	 * @param index
+	 *            the index to decrement
+	 * @return the updated index
+	 */
+	private int decrement(int index) {
+		index--;
+		if (index < 0) {
+			index = maxElements - 1;
+		}
+		return index;
+	}
+
+	/**
+	 * Returns an iterator over this buffer's elements.
+	 *
+	 * @return an iterator over this buffer's elements
+	 */
+	public Iterator iterator() {
+		return new Iterator() {
+
+			private int index = start;
+			private int lastReturnedIndex = -1;
+			private boolean isFirst = full;
+
+			public boolean hasNext() {
+				return isFirst || (index != end);
+
+			}
+
+			public Object next() {
+				if (!hasNext()) {
+					throw new NoSuchElementException();
+				}
+				isFirst = false;
+				lastReturnedIndex = index;
+				index = increment(index);
+				return elements[lastReturnedIndex];
+			}
+
+			public void remove() {
+				if (lastReturnedIndex == -1) {
+					throw new IllegalStateException();
+				}
+
+				// First element can be removed quickly
+				if (lastReturnedIndex == start) {
+					NullableCircularBuffer.this.remove();
+					lastReturnedIndex = -1;
+					return;
+				}
+
+				int pos = lastReturnedIndex + 1;
+				if (start < lastReturnedIndex && pos < end) {
+					// shift in one part
+					System.arraycopy(elements, pos, elements, lastReturnedIndex, end - pos);
+				} else {
+					// Other elements require us to shift the subsequent
+					// elements
+					while (pos != end) {
+						if (pos >= maxElements) {
+							elements[pos - 1] = elements[0];
+							pos = 0;
+						} else {
+							elements[decrement(pos)] = elements[pos];
+							pos = increment(pos);
+						}
+					}
+				}
+
+				lastReturnedIndex = -1;
+				end = decrement(end);
+				elements[end] = null;
+				full = false;
+				index = decrement(index);
+			}
+
+		};
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6b6951ef/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
index 3140dc0..f70412f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
@@ -50,7 +50,7 @@ public class BatchReduceTest {
 		expected.add(12);
 		expected.add(18);
 		expected.add(24);
-		expected.add(27);
+		expected.add(19);
 		assertEquals(expected, MockInvokable.createAndExecute(invokable, inputs));
 
 		List<Integer> inputs2 = new ArrayList<Integer>();
@@ -58,7 +58,7 @@ public class BatchReduceTest {
 		inputs2.add(2);
 		inputs2.add(-1);
 		inputs2.add(-3);
-		inputs2.add(3);
+		inputs2.add(-4);
 
 		BatchReduceInvokable<Integer> invokable2 = new BatchReduceInvokable<Integer>(
 				new ReduceFunction<Integer>() {
@@ -72,12 +72,11 @@ public class BatchReduceTest {
 							return value2;
 						}
 					}
-				}, 2, 2);
+				}, 2, 3);
 
 		List<Integer> expected2 = new ArrayList<Integer>();
 		expected2.add(1);
-		expected2.add(-3);
-		expected2.add(3);
+		expected2.add(-4);
 
 		assertEquals(expected2, MockInvokable.createAndExecute(invokable2, inputs2));
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6b6951ef/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
index 850ac8d..00550f0 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
@@ -44,6 +44,15 @@ public class GroupedBatchReduceTest {
 		inputs.add(5);
 		inputs.add(1);
 		inputs.add(5);
+		
+		List<Integer> expected = new ArrayList<Integer>();
+		expected.add(15);
+		expected.add(3);
+		expected.add(3);
+		expected.add(15);
+		expected.add(1);
+		expected.add(5);
+
 
 		GroupedBatchReduceInvokable<Integer> invokable = new GroupedBatchReduceInvokable<Integer>(
 				new ReduceFunction<Integer>() {
@@ -53,17 +62,8 @@ public class GroupedBatchReduceTest {
 					public Integer reduce(Integer value1, Integer value2) throws Exception {
 						return value1 + value2;
 					}
-				}, 4, 2, 0);
-
-		List<Integer> expected = new ArrayList<Integer>();
-		expected.add(2);
-		expected.add(10);
-		expected.add(1);
-		expected.add(15);
-		expected.add(2);
-		expected.add(10);
-		expected.add(2);
-		expected.add(10);
+				}, 3, 2, 0);
+		
 		List<Integer> actual = MockInvokable.createAndExecute(invokable, inputs);
 		assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(actual));
 		assertEquals(expected.size(), actual.size());
@@ -77,6 +77,11 @@ public class GroupedBatchReduceTest {
 		inputs2.add(new Tuple2<Integer, String>(10, "a"));
 		inputs2.add(new Tuple2<Integer, String>(2, "b"));
 		inputs2.add(new Tuple2<Integer, String>(1, "a"));
+		
+		List<Tuple2<Integer, String>> expected2 = new ArrayList<Tuple2<Integer, String>>();
+		expected2.add(new Tuple2<Integer, String>(-1, "a"));
+		expected2.add(new Tuple2<Integer, String>(-2, "a"));
+		expected2.add(new Tuple2<Integer, String>(0, "b"));
 
 		GroupedBatchReduceInvokable<Tuple2<Integer, String>> invokable2 = new GroupedBatchReduceInvokable<Tuple2<Integer, String>>(
 				new ReduceFunction<Tuple2<Integer, String>>() {
@@ -93,14 +98,10 @@ public class GroupedBatchReduceTest {
 					}
 				}, 3, 3, 1);
 
-		List<Tuple2<Integer, String>> expected2 = new ArrayList<Tuple2<Integer, String>>();
-		expected2.add(new Tuple2<Integer, String>(1, "a"));
-		expected2.add(new Tuple2<Integer, String>(0, "b"));
-		expected2.add(new Tuple2<Integer, String>(-2, "a"));
-		expected2.add(new Tuple2<Integer, String>(2, "b"));
-		expected2.add(new Tuple2<Integer, String>(1, "a"));
+		
 
 		List<Tuple2<Integer, String>> actual2 = MockInvokable.createAndExecute(invokable2, inputs2);
+		
 		assertEquals(new HashSet<Tuple2<Integer, String>>(expected2),
 				new HashSet<Tuple2<Integer, String>>(actual2));
 		assertEquals(expected2.size(), actual2.size());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6b6951ef/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
index ff0951d..8b4431a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
@@ -44,6 +44,17 @@ public class WindowReduceInvokableTest {
 		inputs.add(10);
 		inputs.add(11);
 		inputs.add(11);
+		//1,2,3,4-3,4,5,6-5,6,7,8-7,8,9,10-9,10,11
+		//12-12-5-10-32
+		
+		List<Integer> expected = new ArrayList<Integer>();
+		expected.add(12);
+		expected.add(12);
+		expected.add(5);
+		expected.add(10);
+		expected.add(32);
+
+		
 		WindowReduceInvokable<Integer> invokable = new WindowReduceInvokable<Integer>(
 				new ReduceFunction<Integer>() {
 					private static final long serialVersionUID = 1L;
@@ -62,15 +73,11 @@ public class WindowReduceInvokableTest {
 
 					@Override
 					public long getStartTime() {
-						return 0;
+						return 1;
 					}
 				});
 
-		List<Integer> expected = new ArrayList<Integer>();
-		expected.add(8);
-		expected.add(16);
-		expected.add(9);
-		expected.add(32);
+		
 		assertEquals(expected, MockInvokable.createAndExecute(invokable, inputs));
 
 		List<Tuple2<String, Integer>> inputs2 = new ArrayList<Tuple2<String, Integer>>();
@@ -78,9 +85,18 @@ public class WindowReduceInvokableTest {
 		inputs2.add(new Tuple2<String, Integer>("a", 2));
 		inputs2.add(new Tuple2<String, Integer>("b", 2));
 		inputs2.add(new Tuple2<String, Integer>("b", 2));
-		inputs2.add(new Tuple2<String, Integer>("a", 3));
-		inputs2.add(new Tuple2<String, Integer>("b", 4));
 		inputs2.add(new Tuple2<String, Integer>("b", 5));
+		inputs2.add(new Tuple2<String, Integer>("a", 7));
+		inputs2.add(new Tuple2<String, Integer>("b", 9));
+		inputs2.add(new Tuple2<String, Integer>("b", 10));
+		
+		List<Tuple2<String, Integer>> expected2 = new ArrayList<Tuple2<String, Integer>>();
+		expected2.add(new Tuple2<String, Integer>("a", 3));
+		expected2.add(new Tuple2<String, Integer>("b", 4));
+		expected2.add(new Tuple2<String, Integer>("b", 5));
+		expected2.add(new Tuple2<String, Integer>("a", 7));
+		expected2.add(new Tuple2<String, Integer>("b", 10));
+
 
 		GroupedWindowReduceInvokable<Tuple2<String, Integer>> invokable2 = new GroupedWindowReduceInvokable<Tuple2<String, Integer>>(
 				new ReduceFunction<Tuple2<String, Integer>>() {
@@ -91,7 +107,7 @@ public class WindowReduceInvokableTest {
 							Tuple2<String, Integer> value2) throws Exception {
 						return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
 					}
-				}, 3, 2, 0, new TimeStamp<Tuple2<String, Integer>>() {
+				}, 2, 3, 0, new TimeStamp<Tuple2<String, Integer>>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
@@ -105,11 +121,7 @@ public class WindowReduceInvokableTest {
 					}
 				});
 
-		List<Tuple2<String, Integer>> expected2 = new ArrayList<Tuple2<String, Integer>>();
-		expected2.add(new Tuple2<String, Integer>("a", 6));
-		expected2.add(new Tuple2<String, Integer>("b", 4));
-		expected2.add(new Tuple2<String, Integer>("b", 9));
-		expected2.add(new Tuple2<String, Integer>("a", 3));
+
 		List<Tuple2<String, Integer>> actual2 = MockInvokable.createAndExecute(invokable2, inputs2);
 		assertEquals(new HashSet<Tuple2<String, Integer>>(expected2),
 				new HashSet<Tuple2<String, Integer>>(actual2));


Mime
View raw message