flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [14/20] flink git commit: [streaming] WindowBuffer interface added for preaggregator logic + simple tumbling prereducer
Date Mon, 16 Feb 2015 14:25:40 GMT
[streaming] WindowBuffer interface added for preaggregator logic + simple tumbling prereducer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef7b7cd9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef7b7cd9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef7b7cd9

Branch: refs/heads/master
Commit: ef7b7cd968c13ce9df15e30e33300e0f26def303
Parents: 8708688
Author: Gyula Fora <gyfora@apache.org>
Authored: Tue Feb 10 12:21:09 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Feb 16 13:06:08 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/DiscretizedStream.java       | 47 ++++++-----
 .../api/datastream/WindowedDataStream.java      | 83 +++++++++++++++-----
 .../operator/StreamReduceInvokable.java         |  2 +-
 .../operator/windowing/BasicWindowBuffer.java   | 74 +++++++++++++++++
 .../windowing/GroupedStreamDiscretizer.java     | 10 ++-
 .../windowing/GroupedTimeDiscretizer.java       | 27 +------
 .../operator/windowing/StreamDiscretizer.java   | 54 ++++---------
 .../windowing/StreamWindowSerializer.java       | 10 +--
 .../operator/windowing/TumblingPreReducer.java  | 78 ++++++++++++++++++
 .../operator/windowing/WindowBuffer.java        | 36 +++++++++
 .../windowing/GroupedStreamDiscretizerTest.java |  2 +-
 .../windowing/StreamDiscretizerTest.java        |  6 +-
 .../windowing/WindowIntegrationTest.java        |  2 +
 .../examples/windowing/StockPrices.java         | 27 +++++--
 .../scala/examples/windowing/StockPrices.scala  | 29 ++++---
 .../flink/streaming/api/scala/package.scala     |  6 +-
 16 files changed, 360 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef7b7cd9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 55613b4..5541ba3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -28,11 +28,11 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.invokable.operator.windowing.WindowFlattener;
 import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow;
 import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindowTypeInfo;
-import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMerger;
+import org.apache.flink.streaming.api.invokable.operator.windowing.WindowFlattener;
 import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMapper;
+import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMerger;
 import org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartitioner;
 import org.apache.flink.streaming.api.invokable.operator.windowing.WindowReducer;
 
@@ -48,13 +48,15 @@ import org.apache.flink.streaming.api.invokable.operator.windowing.WindowReducer
  */
 public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 
-	protected SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream;
+	private SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream;
+	private WindowTransformation transformation;
 
 	protected DiscretizedStream(SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream,
-			KeySelector<OUT, ?> groupByKey) {
+			KeySelector<OUT, ?> groupByKey, WindowTransformation tranformation) {
 		super();
 		this.groupByKey = groupByKey;
 		this.discretizedStream = discretizedStream;
+		this.transformation = tranformation;
 	}
 
 	/**
@@ -70,12 +72,13 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 	@Override
 	public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
 
-		DiscretizedStream<OUT> out = partition(false).transform("Window Reduce", getType(),
+		DiscretizedStream<OUT> out = partition(transformation).transform(
+				WindowTransformation.REDUCEWINDOW, "Window Reduce", getType(),
 				new WindowReducer<OUT>(reduceFunction)).merge();
 
 		if (!isGrouped()) {
-			return out.transform("Window Reduce", out.getType(), new WindowReducer<OUT>(
-					reduceFunction));
+			return out.transform(WindowTransformation.REDUCEWINDOW, "Window Reduce", out.getType(),
+					new WindowReducer<OUT>(reduceFunction));
 		} else {
 			return out;
 		}
@@ -106,7 +109,8 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 	@Override
 	public <R> DiscretizedStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction,
 			TypeInformation<R> returnType) {
-		DiscretizedStream<R> out = partition(true).transform("Window Reduce", returnType,
+		DiscretizedStream<R> out = partition(transformation).transform(
+				WindowTransformation.REDUCEWINDOW, "Window Reduce", returnType,
 				new WindowMapper<OUT, R>(reduceFunction));
 
 		if (isGrouped()) {
@@ -116,30 +120,31 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 		}
 	}
 
-	private <R> DiscretizedStream<R> transform(String operatorName, TypeInformation<R> retType,
+	private <R> DiscretizedStream<R> transform(WindowTransformation transformation,
+			String operatorName, TypeInformation<R> retType,
 			StreamInvokable<StreamWindow<OUT>, StreamWindow<R>> invokable) {
 
 		return wrap(discretizedStream.transform(operatorName, new StreamWindowTypeInfo<R>(retType),
-				invokable));
+				invokable), transformation);
 	}
 
-	private DiscretizedStream<OUT> partition(boolean isMap) {
+	private DiscretizedStream<OUT> partition(WindowTransformation transformation) {
 
 		int parallelism = discretizedStream.getParallelism();
 
 		if (isGrouped()) {
-			DiscretizedStream<OUT> out = transform("Window partitioner", getType(),
+			DiscretizedStream<OUT> out = transform(transformation, "Window partitioner", getType(),
 					new WindowPartitioner<OUT>(groupByKey)).setParallelism(parallelism);
 
 			out.groupByKey = null;
 
 			return out;
-		} else if (!isMap) {
+		} else if (transformation == WindowTransformation.MAPWINDOW) {
 			return transform(
+					transformation,
 					"Window partitioner",
 					getType(),
-					new WindowPartitioner<OUT>(discretizedStream.environment
-							.getDegreeOfParallelism())).setParallelism(parallelism);
+					new WindowPartitioner<OUT>(parallelism)).setParallelism(parallelism);
 		} else {
 			return this;
 		}
@@ -160,9 +165,15 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 		return discretizedStream.transform("Window Flatten", getType(), new WindowFlattener<OUT>());
 	}
 
-	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@SuppressWarnings("rawtypes")
 	private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator stream) {
-		return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey);
+		return wrap(stream, transformation);
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator stream,
+			WindowTransformation transformation) {
+		return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey, transformation);
 	}
 
 	public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
@@ -218,7 +229,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 	}
 
 	protected DiscretizedStream<OUT> copy() {
-		return new DiscretizedStream<OUT>(discretizedStream.copy(), groupByKey);
+		return new DiscretizedStream<OUT>(discretizedStream.copy(), groupByKey, transformation);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef7b7cd9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 7a214fe..8fff958 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
@@ -31,11 +32,14 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.A
 import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.operator.windowing.BasicWindowBuffer;
 import org.apache.flink.streaming.api.invokable.operator.windowing.GroupedStreamDiscretizer;
 import org.apache.flink.streaming.api.invokable.operator.windowing.GroupedTimeDiscretizer;
 import org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer;
 import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow;
 import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindowTypeInfo;
+import org.apache.flink.streaming.api.invokable.operator.windowing.TumblingPreReducer;
+import org.apache.flink.streaming.api.invokable.operator.windowing.WindowBuffer;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
@@ -210,35 +214,75 @@ public class WindowedDataStream<OUT> {
 		return out;
 	}
 
-	private DiscretizedStream<OUT> discretize(boolean isMap) {
+	private DiscretizedStream<OUT> discretize(WindowTransformation transformation) {
 
-		StreamInvokable<OUT, StreamWindow<OUT>> discretizer = getDiscretizer();
+		StreamInvokable<OUT, StreamWindow<OUT>> discretizer = getDiscretizer(transformation,
+				getTrigger(), getEviction(), discretizerKey);
 
-		int parallelism = isLocal || (discretizerKey != null) ? dataStream.environment
-				.getDegreeOfParallelism() : 1;
+		int parallelism = getDiscretizerParallelism();
 
 		return new DiscretizedStream<OUT>(dataStream.transform("Stream Discretizer",
 				new StreamWindowTypeInfo<OUT>(getType()), discretizer).setParallelism(parallelism),
-				groupByKey);
+				groupByKey, transformation);
+
+	}
+
+	protected enum WindowTransformation {
+
+		REDUCEWINDOW, MAPWINDOW, NONE;
+
+		private Function UDF;
+
+		public WindowTransformation with(Function UDF) {
+			this.UDF = UDF;
+			return this;
+		}
+	}
 
+	private int getDiscretizerParallelism() {
+		return isLocal || (discretizerKey != null) ? dataStream.environment
+				.getDegreeOfParallelism() : 1;
 	}
 
-	private StreamInvokable<OUT, StreamWindow<OUT>> getDiscretizer() {
+	private StreamInvokable<OUT, StreamWindow<OUT>> getDiscretizer(
+			WindowTransformation transformation, TriggerPolicy<OUT> trigger,
+			EvictionPolicy<OUT> eviction, KeySelector<OUT, ?> discretizerKey) {
+
+		WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation, trigger, eviction,
+				discretizerKey);
+
 		if (discretizerKey == null) {
-			return new StreamDiscretizer<OUT>(getTrigger(), getEvicter());
-		} else if (getTrigger() instanceof TimeTriggerPolicy
-				&& ((TimeTriggerPolicy<OUT>) getTrigger()).timestampWrapper.isDefaultTimestamp()) {
+			return new StreamDiscretizer<OUT>(trigger, eviction, windowBuffer);
+		} else if (trigger instanceof TimeTriggerPolicy
+				&& ((TimeTriggerPolicy<OUT>) trigger).timestampWrapper.isDefaultTimestamp()) {
 			return new GroupedTimeDiscretizer<OUT>(discretizerKey,
-					(TimeTriggerPolicy<OUT>) getTrigger(),
-					(CloneableEvictionPolicy<OUT>) getEvicter());
+					(TimeTriggerPolicy<OUT>) trigger, (CloneableEvictionPolicy<OUT>) eviction,
+					windowBuffer);
 		} else {
 			return new GroupedStreamDiscretizer<OUT>(discretizerKey,
-					(CloneableTriggerPolicy<OUT>) getTrigger(),
-					(CloneableEvictionPolicy<OUT>) getEvicter());
+					(CloneableTriggerPolicy<OUT>) trigger, (CloneableEvictionPolicy<OUT>) eviction,
+					windowBuffer);
 		}
 
 	}
 
+	@SuppressWarnings("unchecked")
+	private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation,
+			TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> eviction,
+			KeySelector<OUT, ?> discretizerKey) {
+
+		if (transformation == WindowTransformation.REDUCEWINDOW
+				&& eviction instanceof TumblingEvictionPolicy) {
+			if (discretizerKey == null) {
+				return new TumblingPreReducer<OUT>((ReduceFunction<OUT>) transformation.UDF,
+						getType().createSerializer());
+			} else {
+				return new BasicWindowBuffer<OUT>();
+			}
+		}
+		return new BasicWindowBuffer<OUT>();
+	}
+
 	/**
 	 * Applies a reduce transformation on the windowed data stream by reducing
 	 * the current window at every trigger.The user can also extend the
@@ -250,7 +294,8 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream
 	 */
 	public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
-		return discretize(false).reduceWindow(reduceFunction);
+		return discretize(WindowTransformation.REDUCEWINDOW.with(reduceFunction)).reduceWindow(
+				reduceFunction);
 	}
 
 	/**
@@ -267,7 +312,8 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream
 	 */
 	public <R> WindowedDataStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction) {
-		return discretize(true).mapWindow(reduceFunction);
+		return discretize(WindowTransformation.MAPWINDOW.with(reduceFunction)).mapWindow(
+				reduceFunction);
 	}
 
 	/**
@@ -289,7 +335,8 @@ public class WindowedDataStream<OUT> {
 	public <R> WindowedDataStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction,
 			TypeInformation<R> outType) {
 
-		return discretize(true).mapWindow(reduceFunction, outType);
+		return discretize(WindowTransformation.MAPWINDOW.with(reduceFunction)).mapWindow(
+				reduceFunction, outType);
 	}
 
 	public DataStream<OUT> flatten() {
@@ -533,7 +580,7 @@ public class WindowedDataStream<OUT> {
 
 	}
 
-	protected EvictionPolicy<OUT> getEvicter() {
+	protected EvictionPolicy<OUT> getEviction() {
 
 		if (evictionHelper != null) {
 			return evictionHelper.toEvict();
@@ -571,7 +618,7 @@ public class WindowedDataStream<OUT> {
 	}
 
 	public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
-		return discretize(true).getDiscretizedStream();
+		return discretize(WindowTransformation.NONE).getDiscretizedStream();
 	}
 
 	protected static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> {

http://git-wip-us.apache.org/repos/asf/flink/blob/ef7b7cd9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index e1a56cc..fe6c41a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -51,7 +51,7 @@ public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> {
 		nextValue = nextObject;
 
 		if (currentValue != null) {
-			currentValue = reducer.reduce(currentValue, nextValue);
+			currentValue = reducer.reduce(copy(currentValue), nextValue);
 		} else {
 			currentValue = nextValue;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ef7b7cd9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/BasicWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/BasicWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/BasicWindowBuffer.java
new file mode 100644
index 0000000..4c6e7cd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/BasicWindowBuffer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.util.Collector;
+
+public class BasicWindowBuffer<T> implements WindowBuffer<T> {
+
+	private static final long serialVersionUID = 1L;
+	protected LinkedList<T> buffer;
+
+	public BasicWindowBuffer() {
+		this.buffer = new LinkedList<T>();
+	}
+
+	public boolean emitWindow(Collector<StreamWindow<T>> collector) {
+		if (!buffer.isEmpty()) {
+			StreamWindow<T> currentWindow = new StreamWindow<T>();
+			currentWindow.addAll(buffer);
+			collector.collect(currentWindow);
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	public void store(T element) throws Exception {
+		buffer.add(element);
+	}
+
+	public void evict(int n) {
+		for (int i = 0; i < n; i++) {
+			try {
+				buffer.removeFirst();
+			} catch (NoSuchElementException e) {
+				// In case no more elements are in the buffer:
+				// Prevent failure and stop deleting.
+				break;
+			}
+		}
+	}
+
+	public int size() {
+		return buffer.size();
+	}
+
+	@Override
+	public BasicWindowBuffer<T> clone() {
+		return new BasicWindowBuffer<T>();
+	}
+
+	@Override
+	public String toString() {
+		return buffer.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ef7b7cd9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
index e90726d..ae6a2d9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
@@ -37,11 +37,13 @@ public class GroupedStreamDiscretizer<IN> extends StreamInvokable<IN, StreamWind
 	protected Configuration parameters;
 	protected CloneableTriggerPolicy<IN> triggerPolicy;
 	protected CloneableEvictionPolicy<IN> evictionPolicy;
+	protected WindowBuffer<IN> windowBuffer;
 
 	protected Map<Object, StreamDiscretizer<IN>> groupedDiscretizers;
 
 	public GroupedStreamDiscretizer(KeySelector<IN, ?> keySelector,
-			CloneableTriggerPolicy<IN> triggerPolicy, CloneableEvictionPolicy<IN> evictionPolicy) {
+			CloneableTriggerPolicy<IN> triggerPolicy, CloneableEvictionPolicy<IN> evictionPolicy,
+			WindowBuffer<IN> windowBuffer) {
 
 		super(null);
 
@@ -51,6 +53,7 @@ public class GroupedStreamDiscretizer<IN> extends StreamInvokable<IN, StreamWind
 		this.evictionPolicy = evictionPolicy;
 
 		this.groupedDiscretizers = new HashMap<Object, StreamDiscretizer<IN>>();
+		this.windowBuffer = windowBuffer;
 
 	}
 
@@ -76,9 +79,8 @@ public class GroupedStreamDiscretizer<IN> extends StreamInvokable<IN, StreamWind
 			readNext();
 		}
 
-		// finally trigger the buffer.
 		for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {
-			group.emitFinalWindow();
+			group.emitWindow();
 		}
 
 	}
@@ -95,7 +97,7 @@ public class GroupedStreamDiscretizer<IN> extends StreamInvokable<IN, StreamWind
 	protected StreamDiscretizer<IN> makeNewGroup(Object key) throws Exception {
 
 		StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(triggerPolicy.clone(),
-				evictionPolicy.clone());
+				evictionPolicy.clone(), windowBuffer.clone());
 
 		groupDiscretizer.collector = taskContext.getOutputCollector();
 		groupDiscretizer.open(this.parameters);

http://git-wip-us.apache.org/repos/asf/flink/blob/ef7b7cd9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
index 63901a8..6d38ed9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
@@ -17,10 +17,6 @@
 
 package org.apache.flink.streaming.api.invokable.operator.windowing;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
@@ -33,9 +29,10 @@ public class GroupedTimeDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
 	private Thread policyThread;
 
 	public GroupedTimeDiscretizer(KeySelector<IN, ?> keySelector,
-			TimeTriggerPolicy<IN> triggerPolicy, CloneableEvictionPolicy<IN> evictionPolicy) {
+			TimeTriggerPolicy<IN> triggerPolicy, CloneableEvictionPolicy<IN> evictionPolicy,
+			WindowBuffer<IN> windowBuffer) {
 
-		super(keySelector, triggerPolicy, evictionPolicy);
+		super(keySelector, triggerPolicy, evictionPolicy, windowBuffer);
 		this.timeTriggerPolicy = triggerPolicy;
 	}
 
@@ -43,7 +40,7 @@ public class GroupedTimeDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
 	protected StreamDiscretizer<IN> makeNewGroup(Object key) throws Exception {
 
 		StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(triggerPolicy.clone(),
-				evictionPolicy.clone());
+				evictionPolicy.clone(), windowBuffer.clone());
 
 		groupDiscretizer.collector = taskContext.getOutputCollector();
 		// We omit the groupDiscretizer.open(...) call here to avoid starting
@@ -60,20 +57,6 @@ public class GroupedTimeDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
 		policyThread.start();
 	}
 
-	private void removeUnusedGroups(int threshold) {
-		List<Object> toRemove = new ArrayList<Object>();
-
-		for (Entry<Object, StreamDiscretizer<IN>> entry : groupedDiscretizers.entrySet()) {
-			if (entry.getValue().emptyCount > threshold) {
-				toRemove.add(entry.getKey());
-			}
-		}
-
-		for (Object key : toRemove) {
-			groupedDiscretizers.remove(key);
-		}
-	}
-
 	private class TimeCheck implements Runnable {
 
 		@Override
@@ -93,8 +76,6 @@ public class GroupedTimeDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
 						group.triggerOnFakeElement(fake);
 					}
 				}
-
-				removeUnusedGroups(10);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ef7b7cd9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
index ac02af3..6b0bcec 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
@@ -17,9 +17,6 @@
 
 package org.apache.flink.streaming.api.invokable.operator.windowing;
 
-import java.util.LinkedList;
-import java.util.NoSuchElementException;
-
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
@@ -39,10 +36,12 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>>
 	private boolean isActiveTrigger;
 	private boolean isActiveEviction;
 	private Thread activePolicyThread;
-	protected LinkedList<IN> buffer;
 	public int emptyCount = 0;
 
-	public StreamDiscretizer(TriggerPolicy<IN> triggerPolicy, EvictionPolicy<IN> evictionPolicy) {
+	protected WindowBuffer<IN> bufferHandler;
+
+	public StreamDiscretizer(TriggerPolicy<IN> triggerPolicy, EvictionPolicy<IN> evictionPolicy,
+			WindowBuffer<IN> bufferHandler) {
 		super(null);
 
 		this.triggerPolicy = triggerPolicy;
@@ -51,7 +50,7 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>>
 		this.isActiveTrigger = triggerPolicy instanceof ActiveTriggerPolicy;
 		this.isActiveEviction = evictionPolicy instanceof ActiveEvictionPolicy;
 
-		this.buffer = new LinkedList<IN>();
+		this.bufferHandler = bufferHandler;
 	}
 
 	@Override
@@ -66,7 +65,7 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>>
 			activePolicyThread.interrupt();
 		}
 
-		emitFinalWindow();
+		emitWindow();
 
 	}
 
@@ -77,8 +76,9 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>>
 	 * 
 	 * @param input
 	 *            a real input element
+	 * @throws Exception
 	 */
-	protected synchronized void processRealElement(IN input) {
+	protected synchronized void processRealElement(IN input) throws Exception {
 
 		if (isActiveTrigger) {
 			ActiveTriggerPolicy<IN> trigger = (ActiveTriggerPolicy<IN>) triggerPolicy;
@@ -97,7 +97,7 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>>
 
 		evict(input, isTriggered);
 
-		buffer.add(input);
+		bufferHandler.store(input);
 
 	}
 
@@ -124,10 +124,7 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>>
 	 * if not empty
 	 */
 	protected void emitWindow() {
-		if (!buffer.isEmpty()) {
-			StreamWindow<IN> currentWindow = new StreamWindow<IN>();
-			currentWindow.addAll(buffer);
-			collector.collect(currentWindow);
+		if (bufferHandler.emitWindow(collector)) {
 			emptyCount = 0;
 		} else {
 			emptyCount++;
@@ -139,37 +136,16 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>>
 
 		if (isActiveEviction) {
 			ActiveEvictionPolicy<IN> ep = (ActiveEvictionPolicy<IN>) evictionPolicy;
-			numToEvict = ep.notifyEvictionWithFakeElement(input, buffer.size());
+			numToEvict = ep.notifyEvictionWithFakeElement(input, bufferHandler.size());
 		}
 
-		evictFromBuffer(numToEvict);
+		bufferHandler.evict(numToEvict);
 	}
 
 	private void evict(IN input, boolean isTriggered) {
-		int numToEvict = evictionPolicy.notifyEviction(input, isTriggered, buffer.size());
+		int numToEvict = evictionPolicy.notifyEviction(input, isTriggered, bufferHandler.size());
 
-		evictFromBuffer(numToEvict);
-	}
-
-	private void evictFromBuffer(int n) {
-		for (int i = 0; i < n; i++) {
-			try {
-				buffer.removeFirst();
-			} catch (NoSuchElementException e) {
-				// In case no more elements are in the buffer:
-				// Prevent failure and stop deleting.
-				break;
-			}
-		}
-	}
-
-	/**
-	 * This function emits the partial windows at the end of the stream
-	 */
-	protected void emitFinalWindow() {
-		if (!buffer.isEmpty()) {
-			emitWindow();
-		}
+		bufferHandler.evict(numToEvict);
 	}
 
 	@Override
@@ -189,7 +165,7 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>>
 
 	@Override
 	public String toString() {
-		return buffer.toString();
+		return bufferHandler.toString();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ef7b7cd9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java
index 70c608c..12e3574 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java
@@ -47,11 +47,6 @@ public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-
-	@Override
 	public StreamWindow<T> createInstance() {
 		return new StreamWindow<T>(0, 0, 0);
 	}
@@ -133,4 +128,9 @@ public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		// Needs to be implemented
 	}
+
+	@Override
+	public TypeSerializer<StreamWindow<T>> duplicate() {
+		return this;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef7b7cd9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingPreReducer.java
new file mode 100644
index 0000000..4f05813
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingPreReducer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+
+public class TumblingPreReducer<T> implements WindowBuffer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private ReduceFunction<T> reducer;
+
+	private T reduced;
+	private int numOfElements = 0;
+	private TypeSerializer<T> serializer;
+
+	public TumblingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) {
+		this.reducer = reducer;
+		this.serializer = serializer;
+	}
+
+	public boolean emitWindow(Collector<StreamWindow<T>> collector) {
+		if (reduced != null) {
+			StreamWindow<T> currentWindow = new StreamWindow<T>();
+			currentWindow.add(reduced);
+			collector.collect(currentWindow);
+			reduced = null;
+			numOfElements = 0;
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	public void store(T element) throws Exception {
+		if (reduced == null) {
+			reduced = element;
+		} else {
+			reduced = reducer.reduce(serializer.copy(reduced), element);
+		}
+		numOfElements++;
+	}
+
+	public void evict(int n) {
+	}
+
+	public int size() {
+		return numOfElements;
+	}
+
+	@Override
+	public TumblingPreReducer<T> clone() {
+		return new TumblingPreReducer<T>(reducer, serializer);
+	}
+
+	@Override
+	public String toString() {
+		return reduced.toString();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ef7b7cd9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBuffer.java
new file mode 100644
index 0000000..ef8fc2b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBuffer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import java.io.Serializable;
+
+import org.apache.flink.util.Collector;
+
+public interface WindowBuffer<T> extends Serializable {
+
+	public void store(T element) throws Exception;
+
+	public void evict(int n);
+
+	public boolean emitWindow(Collector<StreamWindow<T>> collector);
+
+	public int size();
+	
+	public WindowBuffer<T> clone();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ef7b7cd9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizerTest.java
index 8a6a7ce..04f09e0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizerTest.java
@@ -82,7 +82,7 @@ public class GroupedStreamDiscretizerTest {
 		CloneableEvictionPolicy<Integer> eviction = new TumblingEvictionPolicy<Integer>();
 
 		GroupedStreamDiscretizer<Integer> discretizer = new GroupedStreamDiscretizer<Integer>(
-				keySelector, trigger, eviction);
+				keySelector, trigger, eviction, new BasicWindowBuffer<Integer>());
 
 		List<StreamWindow<Integer>> result = MockContext.createAndExecute(discretizer, inputs);
 		assertEquals(expected, new HashSet<StreamWindow<Integer>>(result));

http://git-wip-us.apache.org/repos/asf/flink/blob/ef7b7cd9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java
index 8fcac8c..706b0ef 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java
@@ -74,7 +74,8 @@ public class StreamDiscretizerTest {
 		EvictionPolicy<Integer> eviction = new TimeEvictionPolicy<Integer>(4L,
 				new TimestampWrapper<Integer>(myTimeStamp, 1));
 
-		StreamDiscretizer<Integer> discretizer = new StreamDiscretizer<Integer>(trigger, eviction);
+		StreamDiscretizer<Integer> discretizer = new StreamDiscretizer<Integer>(trigger, eviction,
+				new BasicWindowBuffer<Integer>());
 
 		List<StreamWindow<Integer>> result = MockContext.createAndExecute(discretizer, inputs);
 		assertEquals(expected, result);
@@ -99,7 +100,8 @@ public class StreamDiscretizerTest {
 
 		EvictionPolicy<Integer> eviction = new TumblingEvictionPolicy<Integer>();
 
-		StreamDiscretizer<Integer> discretizer = new StreamDiscretizer<Integer>(trigger, eviction);
+		StreamDiscretizer<Integer> discretizer = new StreamDiscretizer<Integer>(trigger, eviction,
+				new BasicWindowBuffer<Integer>());
 
 		List<StreamWindow<Integer>> result = MockContext.createAndExecute(discretizer, inputs);
 		assertEquals(expected, result);

http://git-wip-us.apache.org/repos/asf/flink/blob/ef7b7cd9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index c1687e6..bcde0c5 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -101,6 +101,8 @@ public class WindowIntegrationTest implements Serializable {
 		source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
 				.mapWindow(new IdentityWindowMap()).flatten().addSink(new DistributedSink2());
 
+		env.generateSequence(1, 10).window(Count.of(3)).sum(0).getDiscretizedStream().print();
+		
 		env.execute();
 
 		// sum ( Count of 2 slide 3 )

http://git-wip-us.apache.org/repos/asf/flink/blob/ef7b7cd9/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
index ce5db4a..af10e52 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
@@ -78,6 +78,7 @@ public class StockPrices {
 	// PROGRAM
 	// *************************************************************************
 
+	@SuppressWarnings({ "serial", "unused" })
 	public static void main(String[] args) throws Exception {
 
 		if (!parseParameters(args)) {
@@ -108,6 +109,7 @@ public class StockPrices {
 		DataStream<StockPrice> BUX_stream = env.addSource(new StockSource("BUX", 40));
 
 		//Merge all stock streams together
+		@SuppressWarnings("unchecked")
 		DataStream<StockPrice> stockStream = socketStockStream.merge(SPX_stream, FTSE_stream, DJI_stream, BUX_stream);
 		
 		//Step 2
@@ -116,9 +118,9 @@ public class StockPrices {
 				.window(Time.of(10, TimeUnit.SECONDS))
 				.every(Time.of(5, TimeUnit.SECONDS));
 
-		DataStream<StockPrice> lowest = windowedStream.minBy("price").setParallelism(1);
-		DataStream<StockPrice> maxByStock = windowedStream.groupBy("symbol").maxBy("price");
-		DataStream<StockPrice> rollingMean = windowedStream.groupBy("symbol").reduceGroup(new MeanReduce());
+		DataStream<StockPrice> lowest = windowedStream.minBy("price").flatten();
+		DataStream<StockPrice> maxByStock = windowedStream.groupBy("symbol").maxBy("price").flatten();
+		DataStream<StockPrice> rollingMean = windowedStream.groupBy("symbol").mapWindow(new MeanReduce()).flatten();
 
 		//Step 3
 		//Use  delta policy to create price change warnings, and also count the number of warning every half minute
@@ -130,7 +132,7 @@ public class StockPrices {
 						return Math.abs(oldDataPoint.price - newDataPoint.price);
 					}
 				}, DEFAULT_STOCK_PRICE))
-				.reduceGroup(new SendWarning());
+				.mapWindow(new SendWarning()).flatten();
 
 
 		DataStream<Count> warningsPerStock = priceWarnings.map(new MapFunction<String, Count>() {
@@ -138,7 +140,7 @@ public class StockPrices {
 			public Count map(String value) throws Exception {
 				return new Count(value, 1);
 			}
-		}).groupBy("symbol").window(Time.of(30, TimeUnit.SECONDS)).sum("count");
+		}).groupBy("symbol").window(Time.of(30, TimeUnit.SECONDS)).sum("count").flatten();
 
 		//Step 4
 		//Read a stream of tweets and extract the stock symbols
@@ -166,7 +168,7 @@ public class StockPrices {
 			}
 		}).groupBy("symbol")
 				.window(Time.of(30, TimeUnit.SECONDS))
-				.sum("count");
+				.sum("count").flatten();
 
 		//Step 5
 		//For advanced analysis we join the number of tweets and the number of price change warnings by stock
@@ -186,8 +188,7 @@ public class StockPrices {
 
 		DataStream<Double> rollingCorrelation = tweetsAndWarning
 				.window(Time.of(30, TimeUnit.SECONDS))
-				.reduceGroup(new CorrelationReduce())
-				.setParallelism(1);
+				.mapWindow(new CorrelationReduce()).flatten();
 
 		if (fileOutput) {
 			rollingCorrelation.writeAsText(outputPath, 1);
@@ -205,6 +206,7 @@ public class StockPrices {
 
 	public static class StockPrice implements Serializable {
 
+		private static final long serialVersionUID = 1L;
 		public String symbol;
 		public Double price;
 
@@ -226,6 +228,8 @@ public class StockPrices {
 	}
 
 	public static class Count implements Serializable {
+		
+		private static final long serialVersionUID = 1L;
 		public String symbol;
 		public Integer count;
 
@@ -252,6 +256,7 @@ public class StockPrices {
 
 	public final static class StockSource implements SourceFunction<StockPrice> {
 
+		private static final long serialVersionUID = 1L;
 		private Double price;
 		private String symbol;
 		private Integer sigma;
@@ -276,6 +281,7 @@ public class StockPrices {
 
 	public final static class MeanReduce implements GroupReduceFunction<StockPrice, StockPrice> {
 
+		private static final long serialVersionUID = 1L;
 		private Double sum = 0.0;
 		private Integer count = 0;
 		private String symbol = "";
@@ -296,6 +302,7 @@ public class StockPrices {
 
 	public static final class TweetSource implements SourceFunction<String> {
 
+		private static final long serialVersionUID = 1L;
 		Random random;
 		StringBuilder stringBuilder;
 
@@ -318,6 +325,9 @@ public class StockPrices {
 	}
 
 	public static final class SendWarning implements GroupReduceFunction<StockPrice, String> {
+
+		private static final long serialVersionUID = 1L;
+
 		@Override
 		public void reduce(Iterable<StockPrice> values, Collector<String> out) throws Exception {
 			if (values.iterator().hasNext()) {
@@ -328,6 +338,7 @@ public class StockPrices {
 
 	public static final class CorrelationReduce implements GroupReduceFunction<Tuple2<Integer, Integer>, Double> {
 
+		private static final long serialVersionUID = 1L;
 		private Integer leftSum;
 		private Integer rightSum;
 		private Integer count;

http://git-wip-us.apache.org/repos/asf/flink/blob/ef7b7cd9/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
index 8a0ce5e..222a0a7 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
@@ -80,7 +80,7 @@ object StockPrices {
     //Read a stream of stock prices from different sources and merge it into one stream
 
     //Read from a socket stream at map it to StockPrice objects
-    val socketStockStream = env.socketTextStream("localhost", 9999).map(x => {
+    val socketStockStream = env.socketTextStream(hostName, port).map(x => {
       val split = x.split(",")
       StockPrice(split(0), split(1).toDouble)
     })
@@ -98,9 +98,9 @@ object StockPrices {
     //Compute some simple statistics on a rolling window
     val windowedStream = stockStream.window(Time.of(10, SECONDS)).every(Time.of(5, SECONDS))
 
-    val lowest = windowedStream.minBy("price").setParallelism(1)
-    val maxByStock = windowedStream.groupBy("symbol").maxBy("price")
-    val rollingMean = windowedStream.groupBy("symbol").reduceGroup(mean _)
+    val lowest = windowedStream.minBy("price")
+    val maxByStock = windowedStream.groupBy("symbol").maxBy("price").getDiscretizedStream
+    val rollingMean = windowedStream.groupBy("symbol").mapWindow(mean _).getDiscretizedStream
 
     //Step 3 
     //Use  delta policy to create price change warnings,
@@ -108,13 +108,13 @@ object StockPrices {
 
     val priceWarnings = stockStream.groupBy("symbol")
       .window(Delta.of(0.05, priceChange, defaultPrice))
-      .reduceGroup(sendWarning _)
-
+      .mapWindow(sendWarning _)
+      
     val warningsPerStock = priceWarnings.map(Count(_, 1))
-      .groupBy("symbol")
       .window(Time.of(30, SECONDS))
+      .groupBy("symbol")
       .sum("count")
-
+      
     //Step 4 
     //Read a stream of tweets and extract the stock symbols
 
@@ -125,10 +125,10 @@ object StockPrices {
       .filter(symbols.contains(_))                     
                     
     val tweetsPerStock = mentionedSymbols.map(Count(_, 1))
-      .groupBy("symbol")
       .window(Time.of(30, SECONDS))
+      .groupBy("symbol")
       .sum("count")
-
+      
     //Step 5
     //For advanced analysis we join the number of tweets and
     //the number of price change warnings by stock
@@ -140,11 +140,16 @@ object StockPrices {
       .onWindow(30, SECONDS)
       .where("symbol")
       .equalTo("symbol") { (c1, c2) => (c1.count, c2.count) }
+    
 
     val rollingCorrelation = tweetsAndWarning.window(Time.of(30, SECONDS))
-      .reduceGroup(computeCorrelation _).setParallelism(1)
+      .mapWindow(computeCorrelation _)
 
-    rollingCorrelation.print
+    if (fileOutput) {
+      rollingCorrelation.writeAsText(outputPath, 1);
+    } else {
+      rollingCorrelation.print;
+    }
 
     env.execute("Stock stream")
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef7b7cd9/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index 61d7db1..cdda8f2 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -26,8 +26,8 @@ import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
 import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
 import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream }
 import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream }
-
 import language.implicitConversions
+import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow
 
 package object scala {
   // We have this here so that we always have generated TypeInformationS when
@@ -48,7 +48,9 @@ package object scala {
 
   implicit def seqToFlinkSource[T: ClassTag: TypeInformation](scalaSeq: Seq[T]) : DataStream[T] =
     StreamExecutionEnvironment.getExecutionEnvironment.fromCollection(scalaSeq)
-
+    
+  implicit def windowedToDataStream[R](windowedStream: WindowedDataStream[R]): DataStream[R] =
+    windowedStream.flatten      
 
   private[flink] def fieldNames2Indices(
       typeInfo: TypeInformation[_],


Mime
View raw message