flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [09/10] flink git commit: [FLINK-1618] [streaming] Parallel time reduce
Date Fri, 20 Mar 2015 12:41:41 GMT
[FLINK-1618] [streaming] Parallel time reduce

Closes #485


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1377ca97
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1377ca97
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1377ca97

Branch: refs/heads/master
Commit: 1377ca97dd0d8d1bbb7224f562dfc4f68226e02b
Parents: 221e5e6
Author: Gyula Fora <gyfora@apache.org>
Authored: Mon Mar 16 09:52:41 2015 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Fri Mar 20 11:25:04 2015 +0100

----------------------------------------------------------------------
 .../api/StreamingJobGraphGenerator.java         |  28 +--
 .../api/datastream/DiscretizedStream.java       |  91 +++++++-
 .../api/datastream/WindowedDataStream.java      |  94 ++++++--
 .../api/invokable/ChainableInvokable.java       |  10 +
 .../streaming/api/invokable/SinkInvokable.java  |   2 +-
 .../api/invokable/operator/FilterInvokable.java |   2 +-
 .../invokable/operator/FlatMapInvokable.java    |   2 +-
 .../api/invokable/operator/MapInvokable.java    |   2 +-
 .../operator/StreamReduceInvokable.java         |   2 +-
 .../operator/windowing/EmptyWindowFilter.java   |  32 +++
 .../windowing/ParallelGroupedMerge.java         |  41 ++++
 .../operator/windowing/ParallelMerge.java       | 142 +++++++++++
 .../windowing/ParallelWindowPartitioner.java    |  84 -------
 .../operator/windowing/StreamDiscretizer.java   |  13 +-
 .../windowing/WindowBufferInvokable.java        |   3 +-
 .../operator/windowing/WindowMapper.java        |   1 +
 .../operator/windowing/WindowPartExtractor.java |  55 +++++
 .../operator/windowing/WindowPartitioner.java   |   6 +-
 .../streaming/api/windowing/StreamWindow.java   |  39 ++--
 .../api/windowing/StreamWindowSerializer.java   |  33 +--
 .../streaming/api/windowing/WindowUtils.java    |   7 +-
 .../windowbuffer/BasicWindowBuffer.java         |  13 +-
 .../windowbuffer/CompletePreAggregator.java     |  27 ---
 .../windowing/windowbuffer/PreAggregator.java   |  27 +++
 .../windowbuffer/SlidingPreReducer.java         |  16 +-
 .../windowbuffer/TumblingGroupedPreReducer.java |  16 +-
 .../windowbuffer/TumblingPreReducer.java        |  17 +-
 .../windowing/windowbuffer/WindowBuffer.java    |  33 ++-
 .../operator/windowing/ParallelMergeTest.java   | 119 ++++++++++
 .../windowing/WindowIntegrationTest.java        | 234 +++++++++++++------
 .../operator/windowing/WindowMergerTest.java    |   6 +-
 .../windowing/WindowPartitionerTest.java        |   8 +-
 .../api/windowing/StreamWindowTest.java         |   8 +-
 .../TumblingGroupedPreReducerTest.java          |  26 +++
 34 files changed, 915 insertions(+), 324 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 8a110bf..0146448 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -194,10 +194,12 @@ public class StreamingJobGraphGenerator {
 			for (StreamEdge chainable : chainedOutputs) {
 				outputChainedNames.add(chainedNames.get(chainable.getTargetVertex()));
 			}
-			String returnOperatorName = operatorName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")";
+			String returnOperatorName = operatorName + " -> ("
+					+ StringUtils.join(outputChainedNames, ", ") + ")";
 			return returnOperatorName;
 		} else if (chainedOutputs.size() == 1) {
-			String returnOperatorName = operatorName + " -> " + chainedNames.get(chainedOutputs.get(0).getTargetVertex());
+			String returnOperatorName = operatorName + " -> "
+					+ chainedNames.get(chainedOutputs.get(0).getTargetVertex());
 			return returnOperatorName;
 		} else {
 			return operatorName;
@@ -215,8 +217,7 @@ public class StreamingJobGraphGenerator {
 		}
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Parallelism set: {} for {}", streamGraph.getParallelism(vertexID),
-					vertexID);
+			LOG.debug("Parallelism set: {} for {}", streamGraph.getParallelism(vertexID), vertexID);
 		}
 
 		if (streamGraph.getInputFormat(vertexID) != null) {
@@ -263,7 +264,8 @@ public class StreamingJobGraphGenerator {
 		allOutputs.addAll(nonChainableOutputs);
 
 		for (StreamEdge output : allOutputs) {
-			config.setSelectedNames(output.getTargetVertex(), streamGraph.getEdge(vertexID, output.getTargetVertex()).getSelectedNames());
+			config.setSelectedNames(output.getTargetVertex(),
+					streamGraph.getEdge(vertexID, output.getTargetVertex()).getSelectedNames());
 		}
 
 		vertexConfigs.put(vertexID, config);
@@ -302,15 +304,15 @@ public class StreamingJobGraphGenerator {
 		StreamInvokable<?, ?> headInvokable = streamGraph.getInvokable(vertexID);
 		StreamInvokable<?, ?> outInvokable = streamGraph.getInvokable(outName);
 
-		return
-				streamGraph.getInEdges(outName).size() == 1
-						&& outInvokable != null
-						&& outInvokable.getChainingStrategy() == ChainingStrategy.ALWAYS
-						&& (headInvokable.getChainingStrategy() == ChainingStrategy.HEAD || headInvokable
+		return streamGraph.getInEdges(outName).size() == 1
+				&& outInvokable != null
+				&& outInvokable.getChainingStrategy() == ChainingStrategy.ALWAYS
+				&& (headInvokable.getChainingStrategy() == ChainingStrategy.HEAD || headInvokable
 						.getChainingStrategy() == ChainingStrategy.ALWAYS)
-						&& edge.getPartitioner().getStrategy() == PartitioningStrategy.FORWARD
-						&& streamGraph.getParallelism(vertexID) == streamGraph.getParallelism(outName)
-						&& streamGraph.chaining;
+				&& (edge.getPartitioner().getStrategy() == PartitioningStrategy.FORWARD || streamGraph
+						.getParallelism(outName) == 1)
+				&& streamGraph.getParallelism(vertexID) == streamGraph.getParallelism(outName)
+				&& streamGraph.chaining;
 	}
 
 	private void setSlotSharing() {

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 6526aa6..7597b47 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -18,21 +18,31 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.function.WindowMapFunction;
+import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
+import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.windowing.EmptyWindowFilter;
+import org.apache.flink.streaming.api.invokable.operator.windowing.ParallelGroupedMerge;
+import org.apache.flink.streaming.api.invokable.operator.windowing.ParallelMerge;
 import org.apache.flink.streaming.api.invokable.operator.windowing.WindowFlattener;
 import org.apache.flink.streaming.api.invokable.operator.windowing.WindowFolder;
 import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMapper;
 import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMerger;
+import org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartExtractor;
 import org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartitioner;
 import org.apache.flink.streaming.api.invokable.operator.windowing.WindowReducer;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
@@ -91,6 +101,56 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 		}
 	}
 
+	/**
+	 * This method implements the parallel time reduce logic for time windows
+	 * 
+	 * @param reduceFunction
+	 *            The reduce function to be applied on the windows
+	 * @param isPreAggregated
+	 *            Flag whether the window buffer was a pre-aggregator or not
+	 * @return
+	 */
+	protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction,
+			boolean isPreAggregated) {
+
+		// We partition the windowed stream if it is not already pre-aggregated
+		DiscretizedStream<OUT> partitioned = isPreAggregated ? this : partition(transformation);
+
+		// Since we also emit the empty windows for bookkeeping, we need to
+		// filter them out
+		DiscretizedStream<OUT> nonEmpty = filterEmpty(partitioned);
+
+		// We extract the number of parts from each window we will merge using
+		// this afterwards
+		DataStream<Tuple2<Integer, Integer>> numOfParts = extractPartsByID(partitioned);
+
+		// We reduce the windows if not pre-aggregated
+		DiscretizedStream<OUT> reduced = isPreAggregated ? nonEmpty : nonEmpty.transform(
+				WindowTransformation.REDUCEWINDOW, "Window Reduce", nonEmpty.getType(),
+				new WindowReducer<OUT>(reduceFunction));
+
+		// We merge the windows by the number of parts
+		return wrap(parallelMerge(numOfParts, reduced, reduceFunction), false);
+
+	}
+
+	private SingleOutputStreamOperator<StreamWindow<OUT>, ?> parallelMerge(
+			DataStream<Tuple2<Integer, Integer>> numOfParts, DiscretizedStream<OUT> reduced,
+			ReduceFunction<OUT> reduceFunction) {
+
+		CoFlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>> parallelMerger = isGrouped() ? new ParallelGroupedMerge<OUT>()
+				: new ParallelMerge<OUT>(reduceFunction);
+
+		return reduced.discretizedStream
+				.groupBy(new WindowKey<OUT>())
+				.connect(numOfParts.groupBy(0))
+				.addCoFunction(
+						"CoFlatMap",
+						reduced.discretizedStream.getType(),
+						new CoFlatMapInvokable<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>>(
+								parallelMerger));
+	}
+
 	@Override
 	public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {
 
@@ -128,6 +188,23 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 				invokable), transformation);
 	}
 
+	private DiscretizedStream<OUT> filterEmpty(DiscretizedStream<OUT> input) {
+		return wrap(input.discretizedStream.transform("Filter", input.discretizedStream.getType(),
+				new FilterInvokable<StreamWindow<OUT>>(new EmptyWindowFilter<OUT>())
+						.withoutInputCopy()), input.isPartitioned);
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	private DataStream<Tuple2<Integer, Integer>> extractPartsByID(DiscretizedStream<OUT> input) {
+		return input.discretizedStream
+				.transform(
+						"ExtractParts",
+						new TupleTypeInfo(Tuple2.class, BasicTypeInfo.INT_TYPE_INFO,
+								BasicTypeInfo.INT_TYPE_INFO),
+						new FlatMapInvokable<StreamWindow<OUT>, Tuple2<Integer, Integer>>(
+								new WindowPartExtractor<OUT>()).withoutInputCopy());
+	}
+
 	private DiscretizedStream<OUT> partition(WindowTransformation transformation) {
 
 		int parallelism = discretizedStream.getParallelism();
@@ -173,13 +250,15 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 
 	}
 
-	@SuppressWarnings("rawtypes")
-	private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator stream, boolean isPartitioned) {
-		return wrap(stream, transformation);
+	@SuppressWarnings("unchecked")
+	private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator<StreamWindow<R>, ?> stream,
+			boolean isPartitioned) {
+		return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey,
+				transformation, isPartitioned);
 	}
 
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator stream,
+	@SuppressWarnings("unchecked")
+	private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator<StreamWindow<R>, ?> stream,
 			WindowTransformation transformation) {
 		return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey,
 				transformation, isPartitioned);

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index c80546f..73cbdfd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -58,7 +58,7 @@ import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.CompletePreAggregator;
+import org.apache.flink.streaming.api.windowing.windowbuffer.PreAggregator;
 import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountGroupedPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducer;
@@ -262,17 +262,25 @@ public class WindowedDataStream<OUT> {
 	 */
 	public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
 
-		WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
-				.with(clean(reduceFunction));
+		// We check whether we should apply parallel time discretization, which
+		// is a more complex exploiting the monotonic properties of time
+		// policies
+		if (WindowUtils.isTimeOnly(getTrigger(), getEviction()) && discretizerKey == null
+				&& dataStream.getParallelism() > 1) {
+			return timeReduce(reduceFunction);
+		} else {
+			WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
+					.with(clean(reduceFunction));
 
-		WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation);
+			WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation);
 
-		DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
+			DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
 
-		if (windowBuffer instanceof CompletePreAggregator) {
-			return discretized;
-		} else {
-			return discretized.reduceWindow(reduceFunction);
+			if (windowBuffer instanceof PreAggregator) {
+				return discretized;
+			} else {
+				return discretized.reduceWindow(reduceFunction);
+			}
 		}
 	}
 
@@ -377,25 +385,76 @@ public class WindowedDataStream<OUT> {
 		int parallelism = getDiscretizerParallelism(transformation);
 
 		return new DiscretizedStream<OUT>(dataStream
-				.transform("Stream Discretizer", bufferEventType, discretizer)
+				.transform(discretizer.getClass().getSimpleName(), bufferEventType, discretizer)
 				.setParallelism(parallelism)
-				.transform("WindowBuffer", new StreamWindowTypeInfo<OUT>(getType()),
-						bufferInvokable).setParallelism(parallelism), groupByKey, transformation,
-				false);
+				.transform(windowBuffer.getClass().getSimpleName(),
+						new StreamWindowTypeInfo<OUT>(getType()), bufferInvokable)
+				.setParallelism(parallelism), groupByKey, transformation, false);
 
 	}
 
+	/**
+	 * Returns the degree of parallelism for the stream discretizer. The
+	 * returned parallelism is either 1 for for non-parallel global policies (or
+	 * when the input stream is non-parallel), environment parallelism for the
+	 * policies that can run in parallel (such as, any ditributed policy, reduce
+	 * by count or time).
+	 * 
+	 * @param transformation
+	 *            The applied transformation
+	 * @return The parallelism for the stream discretizer
+	 */
 	private int getDiscretizerParallelism(WindowTransformation transformation) {
 		return isLocal
 				|| (transformation == WindowTransformation.REDUCEWINDOW && WindowUtils
 						.isParallelPolicy(getTrigger(), getEviction(), dataStream.getParallelism()))
 				|| (discretizerKey != null) ? dataStream.environment.getDegreeOfParallelism() : 1;
+
+	}
+
+	/**
+	 * Dedicated method for applying parallel time reduce transformations on
+	 * windows
+	 * 
+	 * @param reduceFunction
+	 *            Reduce function to apply
+	 * @return The transformed stream
+	 */
+	protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction) {
+
+		WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
+				.with(clean(reduceFunction));
+
+		// We get the windowbuffer and set it to emit empty windows with
+		// sequential IDs. This logic is necessarry to merge windows created in
+		// parallel.
+		WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation).emitEmpty().sequentialID();
+
+		// If there is a groupby for the reduce operation we apply it before the
+		// discretizers, because we will forward everything afterwards to
+		// exploit task chaining
+		if (groupByKey != null) {
+			dataStream = dataStream.groupBy(groupByKey);
+		}
+
+		// We discretize the stream and call the timeReduce function of the
+		// discretized stream, we also pass the type of the windowbuffer
+		DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
+
+		return discretized
+				.timeReduce(reduceFunction, windowBuffer instanceof PreAggregator);
+
 	}
 
+	/**
+	 * Based on the defined policies, returns the stream discretizer to be used
+	 */
 	private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer() {
 		if (discretizerKey == null) {
 			return new StreamDiscretizer<OUT>(getTrigger(), getEviction());
 		} else if (WindowUtils.isSystemTimeTrigger(getTrigger())) {
+			// We return a special more efficient grouped discretizer for system
+			// time policies to avoid lunching multiple threads
 			return new GroupedTimeDiscretizer<OUT>(discretizerKey,
 					(TimeTriggerPolicy<OUT>) getTrigger(),
 					(CloneableEvictionPolicy<OUT>) getEviction());
@@ -416,6 +475,13 @@ public class WindowedDataStream<OUT> {
 		}
 	}
 
+	/**
+	 * Based on the given policies returns the WindowBuffer used to store the
+	 * elements in the window. This is the module that also encapsulates the
+	 * pre-aggregator logic when it is applicable, reducing the space cost, and
+	 * trigger latency.
+	 * 
+	 */
 	@SuppressWarnings("unchecked")
 	private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) {
 		TriggerPolicy<OUT> trigger = getTrigger();
@@ -704,7 +770,7 @@ public class WindowedDataStream<OUT> {
 
 		if (evictionHelper != null) {
 			return evictionHelper.toEvict();
-		} else if (userEvicter == null) {
+		} else if (userEvicter == null || userEvicter instanceof TumblingEvictionPolicy) {
 			if (triggerHelper instanceof Time) {
 				return triggerHelper.toEvict();
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
index 24c0319..470fc81 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
@@ -25,6 +25,7 @@ public abstract class ChainableInvokable<IN, OUT> extends StreamInvokable<IN, OU
 		Collector<IN> {
 
 	private static final long serialVersionUID = 1L;
+	private boolean copyInput = true;
 
 	public ChainableInvokable(Function userFunction) {
 		super(userFunction);
@@ -36,4 +37,13 @@ public abstract class ChainableInvokable<IN, OUT> extends StreamInvokable<IN, OU
 		this.inSerializer = inSerializer;
 		this.objectSerializer = inSerializer.getObjectSerializer();
 	}
+
+	public ChainableInvokable<IN, OUT> withoutInputCopy() {
+		copyInput = false;
+		return this;
+	}
+
+	protected IN copyInput(IN input) {
+		return copyInput ? copy(input) : input;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 35060fd..2c6b6e6 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -43,7 +43,7 @@ public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> {
 
 	@Override
 	public void collect(IN record) {
-		nextObject = copy(record);
+		nextObject = copyInput(record);
 		callUserFunctionAndLogException();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index ab3f147..610fa53 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -50,7 +50,7 @@ public class FilterInvokable<IN> extends ChainableInvokable<IN, IN> {
 	@Override
 	public void collect(IN record) {
 		if (isRunning) {
-			nextObject = copy(record);
+			nextObject = copyInput(record);
 			callUserFunctionAndLogException();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 025bd32..436cf4e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -45,7 +45,7 @@ public class FlatMapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 	@Override
 	public void collect(IN record) {
 		if (isRunning) {
-			nextObject = copy(record);
+			nextObject = copyInput(record);
 			callUserFunctionAndLogException();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 8fc1f13..9647144 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -45,7 +45,7 @@ public class MapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 	@Override
 	public void collect(IN record) {
 		if (isRunning) {
-			nextObject = copy(record);
+			nextObject = copyInput(record);
 			callUserFunctionAndLogException();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index e7fa2b1..fe58105 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -63,7 +63,7 @@ public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> {
 	@Override
 	public void collect(IN record) {
 		if (isRunning) {
-			nextObject = copy(record);
+			nextObject = copyInput(record);
 			callUserFunctionAndLogException();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/EmptyWindowFilter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/EmptyWindowFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/EmptyWindowFilter.java
new file mode 100644
index 0000000..0f2ee31
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/EmptyWindowFilter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+
+public class EmptyWindowFilter<OUT> implements FilterFunction<StreamWindow<OUT>> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public boolean filter(StreamWindow<OUT> value) throws Exception {
+		return !value.isEmpty();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelGroupedMerge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelGroupedMerge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelGroupedMerge.java
new file mode 100644
index 0000000..737485f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelGroupedMerge.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+
+/**
+ * The version of the ParallelMerge CoFlatMap that does not reduce the incoming
+ * elements only appends them to the current window. This is necessary for
+ * grouped reduces.
+ */
+public class ParallelGroupedMerge<OUT> extends ParallelMerge<OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	public ParallelGroupedMerge() {
+		super(null);
+	}
+
+	@Override
+	protected void updateCurrent(StreamWindow<OUT> current, StreamWindow<OUT> nextWindow)
+			throws Exception {
+		current.addAll(nextWindow);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMerge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMerge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMerge.java
new file mode 100644
index 0000000..8ffca91
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMerge.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.util.Collector;
+
+/**
+ * Class that encapsulates the functionality necessary to merge windows created
+ * in parallel. This CoFlatMap uses the information received on the number of
+ * parts for each window to merge the different parts. It waits until it
+ * receives an indication on the number of parts from all the discretizers
+ * before producing any output.
+ */
+public class ParallelMerge<OUT> extends
+		RichCoFlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>> {
+
+	private static final long serialVersionUID = 1L;
+
+	protected Integer numberOfDiscretizers;
+	private ReduceFunction<OUT> reducer;
+
+	private Map<Integer, Integer> availableNumberOfParts = new HashMap<Integer, Integer>();
+	private Map<Integer, Tuple2<StreamWindow<OUT>, Integer>> receivedWindows = new HashMap<Integer, Tuple2<StreamWindow<OUT>, Integer>>();
+	private Map<Integer, Tuple2<Integer, Integer>> receivedNumberOfParts = new HashMap<Integer, Tuple2<Integer, Integer>>();
+
+	public ParallelMerge(ReduceFunction<OUT> reducer) {
+		this.reducer = reducer;
+	}
+
+	@Override
+	public void flatMap1(StreamWindow<OUT> nextWindow, Collector<StreamWindow<OUT>> out)
+			throws Exception {
+
+		Integer id = nextWindow.windowID;
+
+		Tuple2<StreamWindow<OUT>, Integer> current = receivedWindows.get(id);
+
+		if (current == null) {
+			current = new Tuple2<StreamWindow<OUT>, Integer>(nextWindow, 1);
+		} else {
+			updateCurrent(current.f0, nextWindow);
+			current.f1++;
+		}
+
+		Integer count = current.f1;
+
+		if (availableNumberOfParts.containsKey(id) && availableNumberOfParts.get(id) <= count) {
+			out.collect(current.f0);
+			receivedWindows.remove(id);
+			availableNumberOfParts.remove(id);
+
+			checkOld(id);
+
+		} else {
+			receivedWindows.put(id, (Tuple2<StreamWindow<OUT>, Integer>) current);
+		}
+	}
+
+	private void checkOld(Integer id) {
+		// In case we have remaining partial windows (which indicates errors in
+		// processing), output and log them
+		if (receivedWindows.containsKey(id - 1)) {
+			throw new RuntimeException("Error in processing logic, window with id " + id
+					+ " should have already been processed");
+		}
+
+	}
+
+	@Override
+	public void flatMap2(Tuple2<Integer, Integer> partInfo, Collector<StreamWindow<OUT>> out)
+			throws Exception {
+
+		Integer id = partInfo.f0;
+		Integer numOfParts = partInfo.f1;
+
+		Tuple2<Integer, Integer> currentPartInfo = receivedNumberOfParts.get(id);
+		if (currentPartInfo != null) {
+			currentPartInfo.f0 += numOfParts;
+			currentPartInfo.f1++;
+		} else {
+			currentPartInfo = new Tuple2<Integer, Integer>(numOfParts, 1);
+			receivedNumberOfParts.put(id, currentPartInfo);
+		}
+
+		if (currentPartInfo.f1 >= numberOfDiscretizers) {
+			receivedNumberOfParts.remove(id);
+
+			Tuple2<StreamWindow<OUT>, Integer> current = receivedWindows.get(id);
+
+			Integer count = current != null ? current.f1 : -1;
+
+			if (count >= currentPartInfo.f0) {
+				out.collect(current.f0);
+				receivedWindows.remove(id);
+				checkOld(id);
+			} else if (currentPartInfo.f0 > 0) {
+				availableNumberOfParts.put(id, currentPartInfo.f1);
+			}
+		}
+
+	}
+
+	protected void updateCurrent(StreamWindow<OUT> current, StreamWindow<OUT> nextWindow)
+			throws Exception {
+		if (current.size() != 1 || nextWindow.size() != 1) {
+			throw new RuntimeException(
+					"Error in parallel merge logic. Current window should contain only one element.");
+		}
+		OUT currentReduced = current.remove(0);
+		currentReduced = reducer.reduce(currentReduced, nextWindow.get(0));
+		current.add(currentReduced);
+	}
+
+	@Override
+	public void open(Configuration conf) {
+		this.numberOfDiscretizers = getRuntimeContext().getNumberOfParallelSubtasks();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelWindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelWindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelWindowPartitioner.java
deleted file mode 100644
index 32778da..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelWindowPartitioner.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This invokable applies either split or key partitioning depending on the
- * transformation.
- */
-public class ParallelWindowPartitioner<T> extends
-		ChainableInvokable<StreamWindow<T>, StreamWindow<T>> {
-
-	private KeySelector<T, ?> keySelector;
-	private int numberOfSplits;
-	private int currentWindowID = 0;
-
-	public ParallelWindowPartitioner(KeySelector<T, ?> keySelector) {
-		super(null);
-		this.keySelector = keySelector;
-	}
-
-	public ParallelWindowPartitioner(int numberOfSplits) {
-		super(null);
-		this.numberOfSplits = numberOfSplits;
-	}
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void invoke() throws Exception {
-		while (isRunning && readNext() != null) {
-			callUserFunctionAndLogException();
-		}
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		StreamWindow<T> currentWindow = nextObject;
-		currentWindow.setID(++currentWindowID);
-
-		if (keySelector == null) {
-			if (numberOfSplits <= 1) {
-				collector.collect(currentWindow);
-			} else {
-				for (StreamWindow<T> window : currentWindow.split(numberOfSplits)) {
-					collector.collect(window);
-				}
-			}
-		} else {
-
-			for (StreamWindow<T> window : currentWindow.partitionBy(keySelector)) {
-				collector.collect(window);
-			}
-
-		}
-	}
-
-	@Override
-	public void collect(StreamWindow<T> record) {
-		if (isRunning) {
-			nextObject = record;
-			callUserFunctionAndLogException();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
index e668b66..30512e6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
@@ -25,13 +25,11 @@ import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
 import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
 
 /**
  * This invokable represents the discretization step of a window transformation.
  * The user supplied eviction and trigger policies are applied to create the
  * {@link StreamWindow} that will be further transformed in the next stages.
- * </p> To allow pre-aggregations supply an appropriate {@link WindowBuffer}
  */
 public class StreamDiscretizer<IN> extends StreamInvokable<IN, WindowEvent<IN>> {
 
@@ -129,11 +127,6 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, WindowEvent<IN>>
 		emitWindow();
 	}
 
-	protected synchronized void externalTriggerOnFakeElement(Object input) {
-		emitWindow();
-		activeEvict(input);
-	}
-
 	/**
 	 * This method emits the content of the buffer as a new {@link StreamWindow}
 	 * if not empty
@@ -175,7 +168,7 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, WindowEvent<IN>>
 			ActiveTriggerPolicy<IN> tp = (ActiveTriggerPolicy<IN>) triggerPolicy;
 
 			Runnable runnable = tp.createActiveTriggerRunnable(new WindowingCallback());
-			if (activePolicyThread != null) {
+			if (runnable != null) {
 				activePolicyThread = new Thread(runnable);
 				activePolicyThread.start();
 			}
@@ -216,7 +209,7 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, WindowEvent<IN>>
 
 	@Override
 	public String toString() {
-		return "Discretizer(Trigger: " + triggerPolicy.toString() + ", Eviction: " + evictionPolicy.toString()
-				+ ")";
+		return "Discretizer(Trigger: " + triggerPolicy.toString() + ", Eviction: "
+				+ evictionPolicy.toString() + ")";
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
index 75f7d9d..475611f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
@@ -23,8 +23,7 @@ import org.apache.flink.streaming.api.windowing.WindowEvent;
 import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
 
 /**
- * This invokable flattens the results of the window transformations by
- * outputing the elements of the {@link StreamWindow} one-by-one
+ * This invokable manages the window buffers attached to the discretizers.
  */
 public class WindowBufferInvokable<T> extends ChainableInvokable<WindowEvent<T>, StreamWindow<T>> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
index 3dfd59d..9578a70 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
@@ -50,6 +50,7 @@ public class WindowMapper<IN, OUT> extends MapInvokable<StreamWindow<IN>, Stream
 		@Override
 		public StreamWindow<R> map(StreamWindow<T> window) throws Exception {
 			StreamWindow<R> outputWindow = new StreamWindow<R>(window.windowID);
+			
 			outputWindow.numberOfParts = window.numberOfParts;
 
 			mapper.mapWindow(window, outputWindow);

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartExtractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartExtractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartExtractor.java
new file mode 100644
index 0000000..416b915
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartExtractor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.util.Collector;
+
+/**
+ * This FlatMapFunction is used to send the number of parts for each window ID
+ * (for each parallel discretizer) to the parallel merger that will use is to
+ * merge parallel discretized windows
+ */
+public class WindowPartExtractor<OUT> implements FlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>> {
+
+	private static final long serialVersionUID = 1L;
+
+	Integer lastIndex = -1;
+
+	@Override
+	public void flatMap(StreamWindow<OUT> value, Collector<Tuple2<Integer, Integer>> out)
+			throws Exception {
+
+		// We dont emit new values for the same index, this avoids sending the
+		// same information for the same partitioned window multiple times
+		if (value.windowID != lastIndex) {
+
+			// For empty windows we send 0 since these windows will be filtered
+			// out
+			if (value.isEmpty()) {
+				out.collect(new Tuple2<Integer, Integer>(value.windowID, 0));
+			} else {
+				out.collect(new Tuple2<Integer, Integer>(value.windowID, value.numberOfParts));
+			}
+			lastIndex = value.windowID;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
index 846650d..0a28d99 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
@@ -56,13 +56,14 @@ public class WindowPartitioner<T> extends ChainableInvokable<StreamWindow<T>, St
 			if (numberOfSplits <= 1) {
 				collector.collect(currentWindow);
 			} else {
-				for (StreamWindow<T> window : currentWindow.split(numberOfSplits)) {
+				for (StreamWindow<T> window : StreamWindow.split(currentWindow, numberOfSplits)) {
 					collector.collect(window);
 				}
 			}
 		} else {
 
-			for (StreamWindow<T> window : currentWindow.partitionBy(keySelector)) {
+			for (StreamWindow<T> window : StreamWindow
+					.partitionBy(currentWindow, keySelector, true)) {
 				collector.collect(window);
 			}
 
@@ -76,5 +77,4 @@ public class WindowPartitioner<T> extends ChainableInvokable<StreamWindow<T>, St
 			callUserFunctionAndLogException();
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
index b45babb..ee2ea06 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
@@ -42,7 +42,6 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
 	private static Random rnd = new Random();
 
 	public int windowID;
-
 	public int numberOfParts;
 
 	/**
@@ -104,27 +103,33 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
 	 * Partitions the window using the given keyselector. A subwindow will be
 	 * created for each key.
 	 * 
+	 * @param streamWindow
+	 *            StreamWindow instance to partition
 	 * @param keySelector
 	 *            The keyselector used for extracting keys.
+	 * @param withKey
+	 *            Flag to decide whether the key object should be included in
+	 *            the created window
 	 * @return A list of the subwindows
 	 */
-	public List<StreamWindow<T>> partitionBy(KeySelector<T, ?> keySelector) throws Exception {
-		Map<Object, StreamWindow<T>> partitions = new HashMap<Object, StreamWindow<T>>();
+	public static <X> List<StreamWindow<X>> partitionBy(StreamWindow<X> streamWindow,
+			KeySelector<X, ?> keySelector, boolean withKey) throws Exception {
+		Map<Object, StreamWindow<X>> partitions = new HashMap<Object, StreamWindow<X>>();
 
-		for (T value : this) {
+		for (X value : streamWindow) {
 			Object key = keySelector.getKey(value);
-			StreamWindow<T> window = partitions.get(key);
+			StreamWindow<X> window = partitions.get(key);
 			if (window == null) {
-				window = new StreamWindow<T>(this.windowID, 0);
+				window = new StreamWindow<X>(streamWindow.windowID, 0);
 				partitions.put(key, window);
 			}
 			window.add(value);
 		}
 
-		List<StreamWindow<T>> output = new ArrayList<StreamWindow<T>>();
+		List<StreamWindow<X>> output = new ArrayList<StreamWindow<X>>();
 		int numkeys = partitions.size();
 
-		for (StreamWindow<T> window : partitions.values()) {
+		for (StreamWindow<X> window : partitions.values()) {
 			output.add(window.setNumberOfParts(numkeys));
 		}
 
@@ -134,30 +139,32 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
 	/**
 	 * Splits the window into n equal (if possible) sizes.
 	 * 
+	 * @param window
+	 *            Window to split
 	 * @param n
 	 *            Number of desired partitions
 	 * @return The list of subwindows.
 	 */
-	public List<StreamWindow<T>> split(int n) {
-		int numElements = size();
+	public static <X> List<StreamWindow<X>> split(StreamWindow<X> window, int n) {
+		int numElements = window.size();
 		if (n == 0) {
-			return new ArrayList<StreamWindow<T>>();
+			return new ArrayList<StreamWindow<X>>();
 		}
 		if (n > numElements) {
-			return split(numElements);
+			return split(window, numElements);
 		} else {
-			List<StreamWindow<T>> split = new ArrayList<StreamWindow<T>>();
+			List<StreamWindow<X>> split = new ArrayList<StreamWindow<X>>();
 			int splitSize = numElements / n;
 
 			int index = -1;
 
-			StreamWindow<T> currentSubWindow = new StreamWindow<T>(windowID, n);
+			StreamWindow<X> currentSubWindow = new StreamWindow<X>(window.windowID, n);
 			split.add(currentSubWindow);
 
-			for (T element : this) {
+			for (X element : window) {
 				index++;
 				if (index == splitSize && split.size() < n) {
-					currentSubWindow = new StreamWindow<T>(windowID, n);
+					currentSubWindow = new StreamWindow<X>(window.windowID, n);
 					split.add(currentSubWindow);
 					index = 0;
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
index e8945f1..229cb4a 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -33,6 +34,7 @@ public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow
 
 	private final TypeSerializer<T> typeSerializer;
 	TypeSerializer<Integer> intSerializer = IntSerializer.INSTANCE;
+	TypeSerializer<Boolean> boolSerializer = BooleanSerializer.INSTANCE;
 
 	public StreamWindowSerializer(TypeInformation<T> typeInfo, ExecutionConfig conf) {
 		this.typeSerializer = typeInfo.createSerializer(conf);
@@ -62,7 +64,6 @@ public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow
 		reuse.clear();
 		reuse.windowID = from.windowID;
 		reuse.numberOfParts = from.numberOfParts;
-
 		for (T element : from) {
 			reuse.add(typeSerializer.copy(element));
 		}
@@ -75,31 +76,21 @@ public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow
 	}
 
 	@Override
-	public void serialize(StreamWindow<T> value, DataOutputView target) throws IOException {
+	public void serialize(StreamWindow<T> window, DataOutputView target) throws IOException {
+
+		intSerializer.serialize(window.windowID, target);
+		intSerializer.serialize(window.numberOfParts, target);
 
-		intSerializer.serialize(value.windowID, target);
-		intSerializer.serialize(value.numberOfParts, target);
-		intSerializer.serialize(value.size(), target);
+		intSerializer.serialize(window.size(), target);
 
-		for (T element : value) {
+		for (T element : window) {
 			typeSerializer.serialize(element, target);
 		}
 	}
 
 	@Override
 	public StreamWindow<T> deserialize(DataInputView source) throws IOException {
-		StreamWindow<T> window = createInstance();
-
-		window.windowID = intSerializer.deserialize(source);
-		window.numberOfParts = intSerializer.deserialize(source);
-
-		int size = intSerializer.deserialize(source);
-
-		for (int i = 0; i < size; i++) {
-			window.add(typeSerializer.deserialize(source));
-		}
-
-		return window;
+		return deserialize(createInstance(), source);
 	}
 
 	@Override
@@ -109,10 +100,10 @@ public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow
 		StreamWindow<T> window = reuse;
 		window.clear();
 
-		window.windowID = source.readInt();
-		window.numberOfParts = source.readInt();
+		window.windowID = intSerializer.deserialize(source);
+		window.numberOfParts = intSerializer.deserialize(source);
 
-		int size = source.readInt();
+		int size = intSerializer.deserialize(source);
 
 		for (int i = 0; i < size; i++) {
 			window.add(typeSerializer.deserialize(source));

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
index 8411d31..034c7d7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
@@ -45,9 +45,10 @@ public class WindowUtils {
 	}
 
 	public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction,
-			int inputParallelism) {
-		return inputParallelism != 1
-				&& ((eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy)) || (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy));
+			int parallelism) {
+		return ((eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy))
+				|| (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy) || (WindowUtils
+				.isTimeOnly(trigger, eviction) && parallelism > 1));
 	}
 
 	public static boolean isSlidingTimePolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
index 8e39398..371e20d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
@@ -27,7 +27,7 @@ import org.apache.flink.util.Collector;
  * Basic window buffer that stores the elements in a simple list without any
  * pre-aggregation.
  */
-public class BasicWindowBuffer<T> implements WindowBuffer<T> {
+public class BasicWindowBuffer<T> extends WindowBuffer<T> {
 
 	private static final long serialVersionUID = 1L;
 	protected LinkedList<T> buffer;
@@ -36,15 +36,12 @@ public class BasicWindowBuffer<T> implements WindowBuffer<T> {
 		this.buffer = new LinkedList<T>();
 	}
 
-	public boolean emitWindow(Collector<StreamWindow<T>> collector) {
-		if (!buffer.isEmpty()) {
-			StreamWindow<T> currentWindow = new StreamWindow<T>();
+	public void emitWindow(Collector<StreamWindow<T>> collector) {
+		if (emitEmpty || !buffer.isEmpty()) {
+			StreamWindow<T> currentWindow = createEmptyWindow();
 			currentWindow.addAll(buffer);
 			collector.collect(currentWindow);
-			return true;
-		} else {
-			return false;
-		}
+		} 
 	}
 
 	public void store(T element) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/CompletePreAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/CompletePreAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/CompletePreAggregator.java
deleted file mode 100644
index 59bd974..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/CompletePreAggregator.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-/**
- * Interface for marking window pre-aggregators that fully process the window so
- * that no further reduce step is necessary afterwards.
- */
-public interface CompletePreAggregator {
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/PreAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/PreAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/PreAggregator.java
new file mode 100644
index 0000000..1b95248
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/PreAggregator.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+/**
+ * Interface for marking window pre-aggregators that fully process the window so
+ * that no further reduce step is necessary afterwards.
+ */
+public interface PreAggregator {
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
index 27f7ff5..8b9558f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.util.Collector;
 
-public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompletePreAggregator {
+public abstract class SlidingPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
 
 	private static final long serialVersionUID = 1L;
 
@@ -45,18 +45,14 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
 		this.serializer = serializer;
 	}
 
-	public boolean emitWindow(Collector<StreamWindow<T>> collector) {
-		StreamWindow<T> currentWindow = new StreamWindow<T>();
+	public void emitWindow(Collector<StreamWindow<T>> collector) {
+		StreamWindow<T> currentWindow = createEmptyWindow();
 
 		try {
-			if (addFinalAggregate(currentWindow)) {
+			if (addFinalAggregate(currentWindow) || emitEmpty) {
 				collector.collect(currentWindow);
-				afterEmit();
-				return true;
-			} else {
-				afterEmit();
-				return false;
-			}
+			} 
+			afterEmit();
 		} catch (Exception e) {
 			throw new RuntimeException(e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
index 9431a99..68f9837 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
@@ -29,7 +29,7 @@ import org.apache.flink.util.Collector;
 /**
  * Grouped pre-reducer for tumbling eviction polciy.
  */
-public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePreAggregator {
+public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
 
 	private static final long serialVersionUID = 1L;
 
@@ -37,6 +37,7 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
 	private KeySelector<T, ?> keySelector;
 
 	private Map<Object, T> reducedValues;
+	private Map<Object, T> keyInstancePerKey = new HashMap<Object, T>();
 
 	private TypeSerializer<T> serializer;
 
@@ -48,16 +49,15 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
 		this.reducedValues = new HashMap<Object, T>();
 	}
 
-	public boolean emitWindow(Collector<StreamWindow<T>> collector) {
+	public void emitWindow(Collector<StreamWindow<T>> collector) {
 
 		if (!reducedValues.isEmpty()) {
-			StreamWindow<T> currentWindow = new StreamWindow<T>();
+			StreamWindow<T> currentWindow = createEmptyWindow();
 			currentWindow.addAll(reducedValues.values());
 			collector.collect(currentWindow);
 			reducedValues.clear();
-			return true;
-		} else {
-			return false;
+		} else if (emitEmpty) {
+			collector.collect(createEmptyWindow());
 		}
 
 	}
@@ -74,6 +74,10 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
 		}
 
 		reducedValues.put(key, reduced);
+
+		if (emitPerGroup && !keyInstancePerKey.containsKey(key)) {
+			keyInstancePerKey.put(key, element);
+		}
 	}
 
 	public void evict(int n) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
index 58b30a6..e56e556 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
@@ -25,7 +25,7 @@ import org.apache.flink.util.Collector;
 /**
  * Non-grouped pre-reducer for tumbling eviction policy.
  */
-public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggregator {
+public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
 
 	private static final long serialVersionUID = 1L;
 
@@ -39,15 +39,14 @@ public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggreg
 		this.serializer = serializer;
 	}
 
-	public boolean emitWindow(Collector<StreamWindow<T>> collector) {
+	public void emitWindow(Collector<StreamWindow<T>> collector) {
 		if (reduced != null) {
-			StreamWindow<T> currentWindow = new StreamWindow<T>();
+			StreamWindow<T> currentWindow = createEmptyWindow();
 			currentWindow.add(reduced);
 			collector.collect(currentWindow);
 			reduced = null;
-			return true;
-		} else {
-			return false;
+		} else if (emitEmpty) {
+			collector.collect(createEmptyWindow());
 		}
 	}
 
@@ -72,4 +71,10 @@ public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggreg
 		return reduced.toString();
 	}
 
+	@Override
+	public WindowBuffer<T> emitEmpty() {
+		emitEmpty = true;
+		return this;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
index 2dd50db..5c5ea52 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
@@ -23,17 +23,38 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.util.Collector;
 
 /**
- * Interface for defining specialized buffers to store/emit window data.
+ * Class for defining specialized buffers to store/emit window data.
  * Pre-aggregators should be implemented using this interface.
  */
-public interface WindowBuffer<T> extends Serializable, Cloneable {
+public abstract class WindowBuffer<T> implements Serializable, Cloneable {
 
-	public void store(T element) throws Exception;
+	private static final long serialVersionUID = 1L;
 
-	public void evict(int n);
+	protected Integer nextID = 1;
+	protected boolean sequentialID = false;
+	protected boolean emitEmpty = false;
+	protected boolean emitPerGroup = false;
 
-	public boolean emitWindow(Collector<StreamWindow<T>> collector);
+	public abstract void store(T element) throws Exception;
 
-	public WindowBuffer<T> clone();
+	public abstract void evict(int n);
+
+	public abstract void emitWindow(Collector<StreamWindow<T>> collector);
+
+	public abstract WindowBuffer<T> clone();
+
+	public WindowBuffer<T> emitEmpty() {
+		emitEmpty = true;
+		return this;
+	}
+
+	public WindowBuffer<T> sequentialID() {
+		sequentialID = true;
+		return this;
+	}
+
+	protected StreamWindow<T> createEmptyWindow() {
+		return sequentialID ? new StreamWindow<T>(nextID++) : new StreamWindow<T>();
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMergeTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMergeTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMergeTest.java
new file mode 100644
index 0000000..d892c48
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMergeTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class ParallelMergeTest {
+
+	@Test
+	public void nonGroupedTest() throws Exception {
+
+		ReduceFunction<Integer> reducer = new ReduceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer reduce(Integer a, Integer b) throws Exception {
+				return a + b;
+			}
+		};
+
+		TestCollector<StreamWindow<Integer>> out = new TestCollector<StreamWindow<Integer>>();
+		List<StreamWindow<Integer>> output = out.getCollected();
+
+		ParallelMerge<Integer> merger = new ParallelMerge<Integer>(reducer);
+		merger.numberOfDiscretizers = 2;
+
+		merger.flatMap1(createTestWindow(1), out);
+		merger.flatMap1(createTestWindow(1), out);
+		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
+		assertTrue(output.isEmpty());
+		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
+		assertEquals(StreamWindow.fromElements(2), output.get(0));
+
+		merger.flatMap2(new Tuple2<Integer, Integer>(2, 2), out);
+		merger.flatMap1(createTestWindow(2), out);
+		merger.flatMap1(createTestWindow(2), out);
+		merger.flatMap2(new Tuple2<Integer, Integer>(2, 1), out);
+		assertEquals(1, output.size());
+		merger.flatMap1(createTestWindow(2), out);
+		assertEquals(StreamWindow.fromElements(3), output.get(1));
+
+		// check error handling
+		merger.flatMap1(createTestWindow(3), out);
+		merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), out);
+		merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), out);
+
+		merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), out);
+		merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), out);
+		merger.flatMap1(createTestWindow(4), out);
+		try {
+			merger.flatMap1(createTestWindow(4), out);
+			fail();
+		} catch (RuntimeException e) {
+			// Do nothing
+		}
+
+		ParallelMerge<Integer> merger2 = new ParallelMerge<Integer>(reducer);
+		merger2.numberOfDiscretizers = 2;
+		merger2.flatMap1(createTestWindow(0), out);
+		merger2.flatMap1(createTestWindow(1), out);
+		merger2.flatMap1(createTestWindow(1), out);
+		merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
+		try {
+			merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
+			fail();
+		} catch (RuntimeException e) {
+			// Do nothing
+		}
+
+	}
+
+	@Test
+	public void groupedTest() throws Exception {
+
+		TestCollector<StreamWindow<Integer>> out = new TestCollector<StreamWindow<Integer>>();
+		List<StreamWindow<Integer>> output = out.getCollected();
+
+		ParallelMerge<Integer> merger = new ParallelGroupedMerge<Integer>();
+		merger.numberOfDiscretizers = 2;
+
+		merger.flatMap1(createTestWindow(1), out);
+		merger.flatMap1(createTestWindow(1), out);
+		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
+		assertTrue(output.isEmpty());
+		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
+		assertEquals(StreamWindow.fromElements(1, 1), output.get(0));
+	}
+
+	private StreamWindow<Integer> createTestWindow(Integer id) {
+		StreamWindow<Integer> ret = new StreamWindow<Integer>(id);
+		ret.add(1);
+		return ret;
+	}
+}


Mime
View raw message