flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-3528] Add FoldingWindowBuffer for Non-Keyed Windows
Date Fri, 26 Feb 2016 23:38:31 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.0 ba069f35b -> 75e03cacc


[FLINK-3528] Add FoldingWindowBuffer for Non-Keyed Windows

This makes AllWindowedStream.fold() take constant space, just like the
keyed WindowOperator.

Also this adds a new test case in EventTimeAllWindowCheckpointingITCase
to verify that the FoldingWindowBuffer works.

This also renames the preexisting window buffers to ReducingWindowBuffer
and ListWindowBuffer to make the naming scheme consistent.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/75e03cac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/75e03cac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/75e03cac

Branch: refs/heads/release-1.0
Commit: 75e03caccad159fb04df4c7085a49d7f76e994c5
Parents: ba069f3
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Fri Feb 26 23:27:26 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Sat Feb 27 00:30:33 2016 +0100

----------------------------------------------------------------------
 .../api/common/functions/FoldFunction.java      |   2 +-
 .../api/datastream/AllWindowedStream.java       |  27 +--
 .../EvictingNonKeyedWindowOperator.java         |   9 +-
 .../windowing/NonKeyedWindowOperator.java       |  41 ++---
 .../windowing/buffers/EvictingWindowBuffer.java |   3 +-
 .../windowing/buffers/FoldingWindowBuffer.java  | 163 +++++++++++++++++++
 .../windowing/buffers/HeapWindowBuffer.java     |  94 -----------
 .../windowing/buffers/ListWindowBuffer.java     | 127 +++++++++++++++
 .../buffers/PreAggregatingHeapWindowBuffer.java |  99 -----------
 .../windowing/buffers/ReducingWindowBuffer.java | 121 ++++++++++++++
 .../windowing/buffers/WindowBuffer.java         |  15 +-
 .../windowing/buffers/WindowBufferFactory.java  |  29 ++--
 .../windowing/AllWindowTranslationTest.java     |  67 +++++++-
 .../EvictingNonKeyedWindowOperatorTest.java     |  12 +-
 .../windowing/NonKeyedWindowOperatorTest.java   |  43 ++---
 .../api/scala/AllWindowTranslationTest.scala    |  26 +--
 .../EventTimeAllWindowCheckpointingITCase.java  |  72 ++++++++
 17 files changed, 633 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
index 8194663..b52828e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
@@ -40,7 +40,7 @@ import java.io.Serializable;
  * @param <O> Type of the elements that the group/list/stream contains
  */
 @Public
-public interface FoldFunction<O,T> extends Function, Serializable {
+public interface FoldFunction<O, T> extends Function, Serializable {
 	/**
 	 * The core method of FoldFunction, combining two values into one value of the same type.
 	 * The fold function is consecutively applied to all values of a group until only a single value remains.

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 6b32880..268dd8c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -43,8 +43,9 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.FoldingWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWindowBuffer;
 
 /**
  * A {@code AllWindowedStream} represents a data stream where the stream of
@@ -157,7 +158,7 @@ public class AllWindowedStream<T, W extends Window> {
 		if (evictor != null) {
 			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new HeapWindowBuffer.Factory<T>(),
+					new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
 					new ReduceIterableAllWindowFunction<W, T>(function),
 					trigger,
 					evictor);
@@ -165,7 +166,7 @@ public class AllWindowedStream<T, W extends Window> {
 		} else {
 			operator = new NonKeyedWindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new PreAggregatingHeapWindowBuffer.Factory<>(function),
+					new ReducingWindowBuffer.Factory<>(function, getInputType().createSerializer(getExecutionEnvironment().getConfig())),
 					new ReduceIterableAllWindowFunction<W, T>(function),
 					trigger);
 		}
@@ -255,12 +256,12 @@ public class AllWindowedStream<T, W extends Window> {
 
 		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
 
-		NonKeyedWindowOperator<T, R, W> operator;
+		NonKeyedWindowOperator<T, T, R, W> operator;
 
 		if (evictor != null) {
 			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new HeapWindowBuffer.Factory<T>(),
+					new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
 					function,
 					trigger,
 					evictor);
@@ -268,7 +269,7 @@ public class AllWindowedStream<T, W extends Window> {
 		} else {
 			operator = new NonKeyedWindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new HeapWindowBuffer.Factory<T>(),
+					new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
 					function,
 					trigger);
 		}
@@ -329,7 +330,7 @@ public class AllWindowedStream<T, W extends Window> {
 		if (evictor != null) {
 			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new HeapWindowBuffer.Factory<T>(),
+					new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
 					new ReduceApplyAllWindowFunction<>(preAggregator, function),
 					trigger,
 					evictor);
@@ -337,8 +338,8 @@ public class AllWindowedStream<T, W extends Window> {
 		} else {
 			operator = new NonKeyedWindowOperator<>(windowAssigner,
 				windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-				new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
-				new ReduceApplyAllWindowFunction<>(preAggregator, function),
+				new ReducingWindowBuffer.Factory<>(preAggregator, getInputType().createSerializer(getExecutionEnvironment().getConfig())),
+				function,
 				trigger);
 		}
 
@@ -400,7 +401,7 @@ public class AllWindowedStream<T, W extends Window> {
 
 			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
 				windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-				new HeapWindowBuffer.Factory<T>(),
+				new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
 				new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function),
 				trigger,
 				evictor);
@@ -410,8 +411,8 @@ public class AllWindowedStream<T, W extends Window> {
 
 			operator = new NonKeyedWindowOperator<>(windowAssigner,
 				windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-				new HeapWindowBuffer.Factory<T>(),
-				new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function),
+				new FoldingWindowBuffer.Factory<>(foldFunction, initialValue, resultType.createSerializer(getExecutionEnvironment().getConfig())),
+				function,
 				trigger);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
index 221367d..22d207d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
@@ -36,11 +36,12 @@ import static java.util.Objects.requireNonNull;
  * @see org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator
  *
  * @param <IN> The type of the incoming elements.
+ * @param <ACC> The type of elements stored in the window buffers.
  * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
  */
 @Internal
-public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends NonKeyedWindowOperator<IN, OUT, W> {
+public class EvictingNonKeyedWindowOperator<IN, ACC, OUT, W extends Window> extends NonKeyedWindowOperator<IN, ACC, OUT, W> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -48,8 +49,8 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends N
 
 	public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
 			TypeSerializer<W> windowSerializer,
-			WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
-			AllWindowFunction<IN, OUT, W> windowFunction,
+			WindowBufferFactory<? super IN, ACC, ? extends EvictingWindowBuffer<IN, ACC>> windowBufferFactory,
+			AllWindowFunction<ACC, OUT, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger,
 			Evictor<? super IN, ? super W> evictor) {
 		super(windowAssigner, windowSerializer, windowBufferFactory, windowFunction, trigger);
@@ -60,7 +61,7 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends N
 	@SuppressWarnings("unchecked, rawtypes")
 	protected void emitWindow(Context context) throws Exception {
 		timestampedCollector.setAbsoluteTimestamp(context.window.maxTimestamp());
-		EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) context.windowBuffer;
+		EvictingWindowBuffer<IN, ACC> windowBuffer = (EvictingWindowBuffer<IN, ACC>) context.windowBuffer;
 
 		int toEvict = 0;
 		if (windowBuffer.size() > 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 95feadc..6bd5c7d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -45,7 +45,6 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.util.InstantiationUtil;
@@ -72,12 +71,13 @@ import static java.util.Objects.requireNonNull;
  * @see org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
  *
  * @param <IN> The type of the incoming elements.
+ * @param <ACC> The type of elements stored in the window buffers.
  * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
  */
 @Internal
-public class NonKeyedWindowOperator<IN, OUT, W extends Window>
-		extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, OUT, W>>
+public class NonKeyedWindowOperator<IN, ACC, OUT, W extends Window>
+		extends AbstractUdfStreamOperator<OUT, AllWindowFunction<ACC, OUT, W>>
 		implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
 
 	private static final long serialVersionUID = 1L;
@@ -92,7 +92,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 	private final Trigger<? super IN, ? super W> trigger;
 
-	private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
+	private final WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> windowBufferFactory;
 
 	/**
 	 * This is used to copy the incoming element because it can be put into several window
@@ -145,8 +145,8 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	 */
 	public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
 			TypeSerializer<W> windowSerializer,
-			WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
-			AllWindowFunction<IN, OUT, W> windowFunction,
+			WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> windowBufferFactory,
+			AllWindowFunction<ACC, OUT, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger) {
 
 		super(windowFunction);
@@ -180,9 +180,6 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 			throw new IllegalStateException("Input serializer was not set.");
 		}
 
-		windowBufferFactory.setRuntimeContext(getRuntimeContext());
-		windowBufferFactory.open(getUserFunctionParameters());
-
 		// these could already be initialized from restoreState()
 		if (watermarkTimers == null) {
 			watermarkTimers = new HashMap<>();
@@ -221,11 +218,6 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	public final void dispose() {
 		super.dispose();
 		windows.clear();
-		try {
-			windowBufferFactory.close();
-		} catch (Exception e) {
-			throw new RuntimeException("Error while closing WindowBufferFactory.", e);
-		}
 	}
 
 	@Override
@@ -236,7 +228,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		for (W window: elementWindows) {
 			Context context = windows.get(window);
 			if (context == null) {
-				WindowBuffer<IN> windowBuffer = windowBufferFactory.create();
+				WindowBuffer<IN, ACC> windowBuffer = windowBufferFactory.create();
 				context = new Context(window, windowBuffer);
 				windows.put(window, context);
 			}
@@ -356,7 +348,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	protected class Context implements TriggerContext {
 		protected W window;
 
-		protected WindowBuffer<IN> windowBuffer;
+		protected WindowBuffer<IN, ACC> windowBuffer;
 
 		protected HashMap<String, Serializable> state;
 
@@ -369,7 +361,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 		public Context(
 				W window,
-				WindowBuffer<IN> windowBuffer) {
+				WindowBuffer<IN, ACC> windowBuffer) {
 			this.window = window;
 			this.windowBuffer = windowBuffer;
 			state = new HashMap<>();
@@ -394,12 +386,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 			in.read(stateData);
 			state = InstantiationUtil.deserializeObject(stateData, userClassloader);
 
-			this.windowBuffer = windowBufferFactory.create();
-			int numElements = in.readInt();
-			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
-			for (int i = 0; i < numElements; i++) {
-				windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord());
-			}
+			this.windowBuffer = windowBufferFactory.restoreFromSnapshot(in);
 		}
 
 		protected void writeToState(AbstractStateBackend.CheckpointStateOutputView out) throws IOException {
@@ -411,11 +398,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 			out.writeInt(serializedState.length);
 			out.write(serializedState, 0, serializedState.length);
 
-			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
-			out.writeInt(windowBuffer.size());
-			for (StreamRecord<IN> element: windowBuffer.getElements()) {
-				recordSerializer.serialize(element, out);
-			}
+			windowBuffer.snapshot(out);
 		}
 
 		@Override
@@ -635,7 +618,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	@VisibleForTesting
-	public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() {
+	public WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> getWindowBufferFactory() {
 		return windowBufferFactory;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
index 25a8211..75f646d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
@@ -25,9 +25,10 @@ import org.apache.flink.annotation.Internal;
  * the buffer.
  *
  * @param <T> The type of elements that this {@code WindowBuffer} can store.
+ * @param <O> The type of elements that this window buffer will return when asked for its contents.
  */
 @Internal
-public interface EvictingWindowBuffer<T> extends WindowBuffer<T> {
+public interface EvictingWindowBuffer<T, O> extends WindowBuffer<T, O> {
 
 	/**
 	 * Removes the given number of elements, starting from the beginning.

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
new file mode 100644
index 0000000..fa44f9d
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
+
+/**
+ * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer uses a
+ * {@link FoldFunction} to incrementally aggregate elements that are added to the buffer.
+ *
+ * @param <T> The type of elements that can be added to this {@code WindowBuffer}.
+ * @param <ACC> The type of the accumulator that this {@code WindowBuffer} can store.
+ */
+@Internal
+public class FoldingWindowBuffer<T, ACC> implements WindowBuffer<T, ACC> {
+
+	private final FoldFunction<T, ACC> foldFunction;
+	private final TypeSerializer<ACC> accSerializer;
+	private StreamRecord<ACC> data;
+
+	protected FoldingWindowBuffer(FoldFunction<T, ACC> foldFunction, ACC initialAccumulator, TypeSerializer<ACC> accSerializer) {
+		this.foldFunction = foldFunction;
+		this.accSerializer = accSerializer;
+		this.data = new StreamRecord<>(initialAccumulator);
+	}
+
+	protected FoldingWindowBuffer(FoldFunction<T, ACC> foldFunction, StreamRecord<ACC> initialAccumulator, TypeSerializer<ACC> accSerializer) {
+		this.foldFunction = foldFunction;
+		this.accSerializer = accSerializer;
+		this.data = initialAccumulator;
+	}
+
+	@Override
+	public void storeElement(StreamRecord<T> element) throws Exception {
+		data.replace(foldFunction.fold(data.getValue(), element.getValue()), element.getTimestamp());
+	}
+
+	@Override
+	public Iterable<StreamRecord<ACC>> getElements() {
+		return Collections.singleton(data);
+	}
+
+	@Override
+	public Iterable<ACC> getUnpackedElements() {
+		return Collections.singleton(data.getValue());
+	}
+
+	@Override
+	public int size() {
+		return 1;
+	}
+
+	@Override
+	public void snapshot(DataOutputView out) throws IOException {
+		MultiplexingStreamRecordSerializer<ACC> recordSerializer = new MultiplexingStreamRecordSerializer<>(accSerializer);
+		recordSerializer.serialize(data, out);
+	}
+
+	public static class Factory<T, ACC> implements WindowBufferFactory<T, ACC, FoldingWindowBuffer<T, ACC>> {
+		private static final long serialVersionUID = 1L;
+
+		private final FoldFunction<T, ACC> foldFunction;
+
+		private final TypeSerializer<ACC> accSerializer;
+
+		private transient ACC initialAccumulator;
+
+		public Factory(FoldFunction<T, ACC> foldFunction, ACC initialValue, TypeSerializer<ACC> accSerializer) {
+			this.foldFunction = foldFunction;
+			this.accSerializer = accSerializer;
+			this.initialAccumulator = initialValue;
+		}
+
+		@Override
+		public FoldingWindowBuffer<T, ACC> create() {
+			return new FoldingWindowBuffer<>(foldFunction, accSerializer.copy(initialAccumulator), accSerializer);
+		}
+
+		@Override
+		public FoldingWindowBuffer<T, ACC> restoreFromSnapshot(DataInputView in) throws IOException {
+			MultiplexingStreamRecordSerializer<ACC> recordSerializer = new MultiplexingStreamRecordSerializer<>(accSerializer);
+			StreamElement element = recordSerializer.deserialize(in);
+			return new FoldingWindowBuffer<>(foldFunction, element.<ACC>asRecord(), accSerializer);
+		}
+
+		private void writeObject(final ObjectOutputStream out) throws IOException {
+			// write all the non-transient fields
+			out.defaultWriteObject();
+
+
+			byte[] serializedDefaultValue;
+			try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+					DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos))
+			{
+				accSerializer.serialize(initialAccumulator, outView);
+
+				outView.flush();
+				serializedDefaultValue = baos.toByteArray();
+			}
+			catch (Exception e) {
+				throw new IOException("Unable to serialize initial accumulator of type " +
+						initialAccumulator.getClass().getSimpleName() + ".", e);
+			}
+
+			out.writeInt(serializedDefaultValue.length);
+			out.write(serializedDefaultValue);
+		}
+
+		private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
+			// read the non-transient fields
+			in.defaultReadObject();
+
+			// read the default value field
+			int size = in.readInt();
+			byte[] buffer = new byte[size];
+			int bytesRead = in.read(buffer);
+
+			if (bytesRead != size) {
+				throw new RuntimeException("Read size does not match expected size.");
+			}
+
+			try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+					DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais))
+			{
+				initialAccumulator = accSerializer.deserialize(inView);
+			}
+			catch (Exception e) {
+				throw new IOException("Unable to deserialize initial accumulator.", e);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
deleted file mode 100644
index 9db449b..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.ArrayDeque;
-
-/**
- * An {@link EvictingWindowBuffer} that stores elements on the Java Heap.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- */
-@Internal
-public class HeapWindowBuffer<T> implements EvictingWindowBuffer<T> {
-	private static final long serialVersionUID = 1L;
-
-	private ArrayDeque<StreamRecord<T>> elements;
-
-	protected HeapWindowBuffer() {
-		this.elements = new ArrayDeque<>();
-	}
-
-	@Override
-	public void storeElement(StreamRecord<T> element) {
-		elements.add(element);
-	}
-
-	@Override
-	public void removeElements(int count) {
-		// TODO determine if this can be done in a better way
-		for (int i = 0; i < count; i++) {
-			elements.removeFirst();
-		}
-	}
-
-	@Override
-	public Iterable<StreamRecord<T>> getElements() {
-		return elements;
-	}
-
-	@Override
-	public Iterable<T> getUnpackedElements() {
-		return FluentIterable.from(elements).transform(new Function<StreamRecord<T>, T>() {
-			@Override
-			public T apply(StreamRecord<T> record) {
-				return record.getValue();
-			}
-		});
-	}
-
-	@Override
-	public int size() {
-		return elements.size();
-	}
-
-	public static class Factory<T> implements WindowBufferFactory<T, HeapWindowBuffer<T>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void setRuntimeContext(RuntimeContext ctx) {}
-
-		@Override
-		public void open(Configuration config) {}
-
-		@Override
-		public void close() {}
-
-		@Override
-		public HeapWindowBuffer<T> create() {
-			return new HeapWindowBuffer<>();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java
new file mode 100644
index 0000000..5b9dd3c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+/**
+ * An {@link EvictingWindowBuffer} that stores elements on the Java Heap.
+ *
+ * @param <T> The type of elements that this {@code WindowBuffer} can store.
+ */
+@Internal
+public class ListWindowBuffer<T> implements EvictingWindowBuffer<T, T> {
+
+	private final TypeSerializer<T>  serializer;
+
+	private ArrayDeque<StreamRecord<T>> elements;
+
+	protected ListWindowBuffer(TypeSerializer<T> serializer) {
+		this.serializer = serializer;
+		this.elements = new ArrayDeque<>();
+	}
+
+	protected ListWindowBuffer(ArrayDeque<StreamRecord<T>> elements, TypeSerializer<T> serializer) {
+		this.serializer = serializer;
+		this.elements = elements;
+	}
+
+	@Override
+	public void storeElement(StreamRecord<T> element) {
+		elements.add(element);
+	}
+
+	@Override
+	public void removeElements(int count) {
+		// TODO determine if this can be done in a better way
+		for (int i = 0; i < count; i++) {
+			elements.removeFirst();
+		}
+	}
+
+	@Override
+	public Iterable<StreamRecord<T>> getElements() {
+		return elements;
+	}
+
+	@Override
+	public Iterable<T> getUnpackedElements() {
+		return FluentIterable.from(elements).transform(new Function<StreamRecord<T>, T>() {
+			@Override
+			public T apply(StreamRecord<T> record) {
+				return record.getValue();
+			}
+		});
+	}
+
+	@Override
+	public int size() {
+		return elements.size();
+	}
+
+	@Override
+	public void snapshot(DataOutputView out) throws IOException {
+		out.writeInt(elements.size());
+
+		MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer);
+
+		for (StreamRecord<T> e: elements) {
+			recordSerializer.serialize(e, out);
+		}
+	}
+
+	public static class Factory<T> implements WindowBufferFactory<T, T, ListWindowBuffer<T>> {
+		private static final long serialVersionUID = 1L;
+
+		private final TypeSerializer<T> serializer;
+
+		public Factory(TypeSerializer<T> serializer) {
+			this.serializer = serializer;
+		}
+
+		@Override
+		public ListWindowBuffer<T> create() {
+			return new ListWindowBuffer<>(serializer);
+		}
+
+		@Override
+		public ListWindowBuffer<T> restoreFromSnapshot(DataInputView in) throws IOException {
+			int size = in.readInt();
+
+			MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer);
+
+			ArrayDeque<StreamRecord<T>> elements = new ArrayDeque<>();
+
+			for (int i = 0; i < size; i++) {
+				elements.add(recordSerializer.deserialize(in).<T>asRecord());
+			}
+
+			return new ListWindowBuffer<>(elements, serializer);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
deleted file mode 100644
index 5f8de4b..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.Collections;
-
-/**
- * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer uses a
- * {@link ReduceFunction} to pre-aggregate elements that are added to the buffer.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- */
-@Internal
-public class PreAggregatingHeapWindowBuffer<T> implements WindowBuffer<T> {
-	private static final long serialVersionUID = 1L;
-
-	private final ReduceFunction<T> reduceFunction;
-	private transient StreamRecord<T> data;
-
-	protected PreAggregatingHeapWindowBuffer(ReduceFunction<T> reduceFunction) {
-		this.reduceFunction = reduceFunction;
-	}
-
-	@Override
-	public void storeElement(StreamRecord<T> element) throws Exception {
-		if (data == null) {
-			data = new StreamRecord<>(element.getValue(), element.getTimestamp());
-		} else {
-			data.replace(reduceFunction.reduce(data.getValue(), element.getValue()));
-		}
-	}
-
-	@Override
-	public Iterable<StreamRecord<T>> getElements() {
-		return Collections.singleton(data);
-	}
-
-	@Override
-	public Iterable<T> getUnpackedElements() {
-		return Collections.singleton(data.getValue());
-	}
-
-	@Override
-	public int size() {
-		return 1;
-	}
-
-	public static class Factory<T> implements WindowBufferFactory<T, PreAggregatingHeapWindowBuffer<T>> {
-		private static final long serialVersionUID = 1L;
-
-		private final ReduceFunction<T> reduceFunction;
-
-		public Factory(ReduceFunction<T> reduceFunction) {
-			this.reduceFunction = reduceFunction;
-		}
-
-		@Override
-		public void setRuntimeContext(RuntimeContext ctx) {
-			FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
-		}
-
-		@Override
-		public void open(Configuration config) throws Exception {
-			FunctionUtils.openFunction(reduceFunction, config);
-		}
-
-		@Override
-		public void close() throws Exception {
-			FunctionUtils.closeFunction(reduceFunction);
-		}
-
-		@Override
-		public PreAggregatingHeapWindowBuffer<T> create() {
-			return new PreAggregatingHeapWindowBuffer<>(reduceFunction);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
new file mode 100644
index 0000000..1f2b639
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.Collections;
+
+/**
+ * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer uses a
+ * {@link ReduceFunction} to incrementally aggregate elements that are added to the buffer.
+ *
+ * @param <T> The type of elements that this {@code WindowBuffer} can store.
+ */
+@Internal
+public class ReducingWindowBuffer<T> implements WindowBuffer<T, T> {
+
+	private final ReduceFunction<T> reduceFunction;
+	private final TypeSerializer<T> serializer;
+	private  StreamRecord<T> data;
+
+	protected ReducingWindowBuffer(ReduceFunction<T> reduceFunction, TypeSerializer<T> serializer) {
+		this.reduceFunction = reduceFunction;
+		this.serializer = serializer;
+		this.data = null;
+	}
+
+	protected ReducingWindowBuffer(ReduceFunction<T> reduceFunction, StreamRecord<T> data, TypeSerializer<T> serializer) {
+		this.reduceFunction = reduceFunction;
+		this.serializer = serializer;
+		this.data = data;
+	}
+
+	@Override
+	public void storeElement(StreamRecord<T> element) throws Exception {
+		if (data == null) {
+			data = new StreamRecord<>(element.getValue(), element.getTimestamp());
+		} else {
+			data.replace(reduceFunction.reduce(data.getValue(), element.getValue()));
+		}
+	}
+
+	@Override
+	public Iterable<StreamRecord<T>> getElements() {
+		return Collections.singleton(data);
+	}
+
+	@Override
+	public Iterable<T> getUnpackedElements() {
+		return Collections.singleton(data.getValue());
+	}
+
+	@Override
+	public int size() {
+		return 1;
+	}
+
+	@Override
+	public void snapshot(DataOutputView out) throws IOException {
+		if (data != null) {
+			out.writeBoolean(true);
+			MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer);
+			recordSerializer.serialize(data, out);
+		} else {
+			out.writeBoolean(false);
+		}
+	}
+
+	public static class Factory<T> implements WindowBufferFactory<T, T, ReducingWindowBuffer<T>> {
+		private static final long serialVersionUID = 1L;
+
+		private final ReduceFunction<T> reduceFunction;
+
+		private final TypeSerializer<T> serializer;
+
+		public Factory(ReduceFunction<T> reduceFunction, TypeSerializer<T> serializer) {
+			this.reduceFunction = reduceFunction;
+			this.serializer = serializer;
+		}
+
+		@Override
+		public ReducingWindowBuffer<T> create() {
+			return new ReducingWindowBuffer<>(reduceFunction, serializer);
+		}
+
+		@Override
+		public ReducingWindowBuffer<T> restoreFromSnapshot(DataInputView in) throws IOException {
+			boolean hasValue = in.readBoolean();
+			if (hasValue) {
+				MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer);
+				StreamElement element = recordSerializer.deserialize(in);
+				return new ReducingWindowBuffer<>(reduceFunction, element.<T>asRecord(), serializer);
+			} else {
+				return new ReducingWindowBuffer<>(reduceFunction, serializer);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
index cbf7dda..16be0f3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
@@ -19,10 +19,11 @@ package org.apache.flink.streaming.runtime.operators.windowing.buffers;
 
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-import java.io.Serializable;
+import java.io.IOException;
 
 /**
  * A {@code WindowBuffer} is used by
@@ -37,9 +38,10 @@ import java.io.Serializable;
  * have their own instance of the {@code Evictor}.
  *
  * @param <T> The type of elements that this {@code WindowBuffer} can store.
+ * @param <O> The type of elements that this window buffer will return when asked for its contents.
  */
 @Internal
-public interface WindowBuffer<T> extends Serializable {
+public interface WindowBuffer<T, O> {
 
 	/**
 	 * Adds the element to the buffer.
@@ -51,16 +53,21 @@ public interface WindowBuffer<T> extends Serializable {
 	/**
 	 * Returns all elements that are currently in the buffer.
 	 */
-	Iterable<StreamRecord<T>> getElements();
+	Iterable<StreamRecord<O>> getElements();
 
 	/**
 	 * Returns all elements that are currently in the buffer. This will unwrap the contained
 	 * elements from their {@link StreamRecord}.
 	 */
-	Iterable<T> getUnpackedElements();
+	Iterable<O> getUnpackedElements();
 
 	/**
 	 * Returns the number of elements that are currently in the buffer.
 	 */
 	int size();
+
+	/**
+	 * Writes the contents of the window buffer to a {@link DataOutputView} for checkpointing.
+	 */
+	void snapshot(DataOutputView out) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
index a4f4b27..1ca6350 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
@@ -18,39 +18,30 @@
 package org.apache.flink.streaming.runtime.operators.windowing.buffers;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 
+import java.io.IOException;
 import java.io.Serializable;
 
 /**
  * A factory for {@link WindowBuffer WindowBuffers}.
  *
  * @param <T> The type of elements that the created {@code WindowBuffer} can store.
+ * @param <O> The type of elements that the created buffer will return when asked for its contents.
  * @param <B> The type of the created {@code WindowBuffer}
  */
 @Internal
-public interface WindowBufferFactory<T, B extends WindowBuffer<T>> extends Serializable {
+public interface WindowBufferFactory<T, O, B extends WindowBuffer<T, O>> extends Serializable {
 
 	/**
-	 * Sets the {@link RuntimeContext} that is used to initialize eventual user functions
-	 * inside the created buffers.
-	 */
-	void setRuntimeContext(RuntimeContext ctx);
-
-	/**
-	 * Calls {@code open()} on eventual user functions inside the buffer.
-	 */
-	void open(Configuration config) throws Exception;
-
-	/**
-	 * Calls {@code close()} on eventual user functions inside the buffer.
+	 * Creates a new {@code WindowBuffer}.
 	 */
-
-	void close() throws Exception;
+	B create();
 
 	/**
-	 * Creates a new {@code WindowBuffer}.
+	 * Restores a {@code WindowBuffer} from a previous snapshot written using
+	 * {@link WindowBuffer#snapshot(DataOutputView)}.
 	 */
-	B create();
+	B restoreFromSnapshot(DataInputView in) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 42f452c..f6e3dcc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -33,8 +34,9 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.FoldingWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWindowBuffer;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
@@ -72,7 +74,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
 		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
-		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
+		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof ReducingWindowBuffer.Factory);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
@@ -94,7 +96,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
 		Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
-		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof ListWindowBuffer.Factory);
 	}
 
 	@Test
@@ -118,7 +120,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
 		Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
-		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
+		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof ReducingWindowBuffer.Factory);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
@@ -141,7 +143,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
 		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
-		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof ListWindowBuffer.Factory);
 	}
 
 	@Test
@@ -166,7 +168,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
-		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof ListWindowBuffer.Factory);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
@@ -191,7 +193,46 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
-		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof ListWindowBuffer.Factory);
+	}
+
+	/**
+	 * These tests ensure that a Fold buffer is used if possible
+	 */
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldBuffer() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DummyFolder folder = new DummyFolder();
+
+		DataStream<Integer> window1 = source
+				.windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.fold(0, folder);
+
+		OneInputTransformation<Tuple2<String, Integer>, Integer> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Integer>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Integer> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
+		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
+		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
+		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof FoldingWindowBuffer.Factory);
+
+		DataStream<Integer> window2 = source
+				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.evictor(CountEvictor.of(13))
+				.fold(0, folder);
+
+		OneInputTransformation<Tuple2<String, Integer>, Integer> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Integer>) window2.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Integer> operator2 = transform2.getOperator();
+		Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
+		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
+		Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof ListWindowBuffer.Factory);
 	}
 
 	// ------------------------------------------------------------------------
@@ -206,4 +247,14 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 			return value1;
 		}
 	}
+
+	public static class DummyFolder implements FoldFunction<Tuple2<String, Integer>, Integer> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer fold(Integer accumulator, Tuple2<String, Integer> value) throws Exception {
+			return accumulator;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
index 571838f..c3a36dd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
@@ -28,11 +29,10 @@ import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Comparator;
@@ -51,15 +51,17 @@ public class EvictingNonKeyedWindowOperatorTest {
 		final int WINDOW_SIZE = 4;
 		final int WINDOW_SLIDE = 2;
 
-		EvictingNonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingNonKeyedWindowOperator<>(
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		EvictingNonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingNonKeyedWindowOperator<>(
 				GlobalWindows.create(),
 				new GlobalWindow.Serializer(),
-				new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(),
+				new ListWindowBuffer.Factory<Tuple2<String, Integer>>(inputType.createSerializer(new ExecutionConfig())),
 				new ReduceIterableAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
 				CountTrigger.of(WINDOW_SLIDE),
 				CountEvictor.of(WINDOW_SIZE));
 
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+		operator.setInputType(inputType, new ExecutionConfig());
 
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
index c0e6ad4..406f3b0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
@@ -20,8 +20,12 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -35,8 +39,8 @@ import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -59,7 +63,7 @@ public class NonKeyedWindowOperatorTest {
 	@SuppressWarnings("unchecked,rawtypes")
 	private WindowBufferFactory windowBufferFactory;
 
-	public NonKeyedWindowOperatorTest(WindowBufferFactory<?, ?> windowBufferFactory) {
+	public NonKeyedWindowOperatorTest(WindowBufferFactory<?, ?, ?> windowBufferFactory) {
 		this.windowBufferFactory = windowBufferFactory;
 	}
 
@@ -74,7 +78,7 @@ public class NonKeyedWindowOperatorTest {
 		final int WINDOW_SIZE = 3;
 		final int WINDOW_SLIDE = 1;
 
-		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
+		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
 				SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				windowBufferFactory,
@@ -150,7 +154,7 @@ public class NonKeyedWindowOperatorTest {
 
 		final int WINDOW_SIZE = 3;
 
-		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
+		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
 				TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				windowBufferFactory,
@@ -224,7 +228,7 @@ public class NonKeyedWindowOperatorTest {
 
 		final int WINDOW_SIZE = 3;
 
-		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
+		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
 				GlobalWindows.create(),
 				new GlobalWindow.Serializer(),
 				windowBufferFactory,
@@ -298,7 +302,7 @@ public class NonKeyedWindowOperatorTest {
 
 		final int WINDOW_SIZE = 4;
 
-		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
+		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
 				GlobalWindows.create(),
 				new GlobalWindow.Serializer(),
 				windowBufferFactory,
@@ -360,26 +364,9 @@ public class NonKeyedWindowOperatorTest {
 	public static class RichSumReducer extends RichReduceFunction<Tuple2<String, Integer>> {
 		private static final long serialVersionUID = 1L;
 
-		private boolean openCalled = false;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			openCalled = true;
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			closeCalled.incrementAndGet();
-		}
-
 		@Override
 		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
 				Tuple2<String, Integer> value2) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called");
-			}
 			return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
 		}
 	}
@@ -389,9 +376,11 @@ public class NonKeyedWindowOperatorTest {
 
 	@Parameterized.Parameters(name = "WindowBuffer = {0}")
 	@SuppressWarnings("unchecked,rawtypes")
-	public static Collection<WindowBufferFactory[]> windowBuffers(){
-		return Arrays.asList(new WindowBufferFactory[]{new PreAggregatingHeapWindowBuffer.Factory(new RichSumReducer())},
-				new WindowBufferFactory[]{new HeapWindowBuffer.Factory()}
+	public static Collection<WindowBufferFactory[]> windowBuffers() {
+		TupleSerializer<Tuple2> tuple2TupleSerializer = new TupleSerializer<>(Tuple2.class,
+				new TypeSerializer<?>[]{StringSerializer.INSTANCE, IntSerializer.INSTANCE});
+		return Arrays.asList(new WindowBufferFactory[]{new ReducingWindowBuffer.Factory(new SumReducer(), tuple2TupleSerializer)},
+				new WindowBufferFactory[]{new ListWindowBuffer.Factory(tuple2TupleSerializer)}
 				);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index f73307c..a676757 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvic
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer}
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.{ListWindowBuffer, ReducingWindowBuffer}
 import org.apache.flink.streaming.runtime.operators.windowing._
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.util.Collector
@@ -111,12 +111,12 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val operator1 = transform1.getOperator
 
-    assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
-    val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
+    assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]])
+    val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _, _]]
     assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(
-      winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+      winOperator1.getWindowBufferFactory.isInstanceOf[ReducingWindowBuffer.Factory[_]])
 
 
     val window2 = source
@@ -134,11 +134,11 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val operator2 = transform2.getOperator
 
-    assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
-    val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
+    assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]])
+    val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _, _]]
     assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
-    assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
+    assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]])
   }
 
   @Test
@@ -161,12 +161,12 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val operator1 = transform1.getOperator
 
-    assertTrue(operator1.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
-    val winOperator1 = operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
+    assertTrue(operator1.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, _]])
+    val winOperator1 = operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, _]]
     assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
     assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
-    assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
+    assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]])
 
 
     val window2 = source
@@ -185,12 +185,12 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val operator2 = transform2.getOperator
 
-    assertTrue(operator2.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
-    val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
+    assertTrue(operator2.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, _]])
+    val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, _]]
     assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
     assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
-    assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
+    assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]])
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index d18a45e..4431106 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.checkpointing;
 
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -280,6 +281,77 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 	}
 
 	@Test
+	public void testPreAggregatedFoldingTumblingTimeWindow() {
+		final int NUM_ELEMENTS_PER_KEY = 3000;
+		final int WINDOW_SIZE = 100;
+		final int NUM_KEYS = 1;
+		FailingSource.reset();
+
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getLeaderRPCPort());
+
+			env.setParallelism(PARALLELISM);
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+			env.enableCheckpointing(100);
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.getConfig().disableSysoutLogging();
+
+			env
+					.addSource(new FailingSource(NUM_KEYS,
+							NUM_ELEMENTS_PER_KEY,
+							NUM_ELEMENTS_PER_KEY / 3))
+					.rebalance()
+					.timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS))
+					.apply(new Tuple4<>(0L, 0L, 0L, new IntType(0)),
+							new FoldFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>>() {
+								@Override
+								public Tuple4<Long, Long, Long, IntType> fold(Tuple4<Long, Long, Long, IntType> accumulator,
+										Tuple2<Long, IntType> value) throws Exception {
+									accumulator.f0 = value.f0;
+									accumulator.f3 = new IntType(accumulator.f3.value + value.f1.value);
+									return accumulator;
+								}
+							},
+							new RichAllWindowFunction<Tuple4<Long, Long, Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
+
+								private boolean open = false;
+
+								@Override
+								public void open(Configuration parameters) {
+									assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+									open = true;
+								}
+
+								@Override
+								public void apply(
+										TimeWindow window,
+										Iterable<Tuple4<Long, Long, Long, IntType>> input,
+										Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+									// validate that the function has been opened properly
+									assertTrue(open);
+
+									for (Tuple4<Long, Long, Long, IntType> in: input) {
+										out.collect(new Tuple4<>(in.f0,
+												window.getStart(),
+												window.getEnd(),
+												in.f3));
+									}
+								}
+							})
+					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+			tryExecute(env, "Tumbling Window Test");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
 	public void testPreAggregatedSlidingTimeWindow() {
 		final int NUM_ELEMENTS_PER_KEY = 3000;
 		final int WINDOW_SIZE = 1000;


Mime
View raw message