flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/4] flink git commit: [FLINK-2991] Add Folding State and use in WindowOperator
Date Fri, 12 Feb 2016 17:51:35 GMT
[FLINK-2991] Add Folding State and use in WindowOperator

This enables efficient incremental aggregation of fold window.

This also adds:
- WindowedStream.apply(initVal, foldFunction, windowFunction)
- AllWindowedStream.apply(initVal, foldFunction, windowFunction)

This closes #1605


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94cba899
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94cba899
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94cba899

Branch: refs/heads/master
Commit: 94cba8998c726092e2cc80fd022ca40bf0c38ec2
Parents: d93b154
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Feb 8 14:56:19 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Feb 12 18:51:01 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBFoldingState.java    | 187 ++++++++++++++++++
 .../streaming/state/RocksDBListState.java       |   9 +-
 .../streaming/state/RocksDBReducingState.java   |  14 +-
 .../streaming/state/RocksDBStateBackend.java    |  20 +-
 .../streaming/state/RocksDBValueState.java      |  14 +-
 .../contrib/streaming/state/DbStateBackend.java |  18 ++
 .../flink/api/common/state/FoldingState.java    |  37 ++++
 .../common/state/FoldingStateDescriptor.java    | 108 ++++++++++
 .../flink/api/common/state/StateBackend.java    |   8 +
 .../flink/api/common/state/StateDescriptor.java |   2 +-
 .../runtime/state/AbstractStateBackend.java     |  20 ++
 .../runtime/state/GenericFoldingState.java      | 133 +++++++++++++
 .../flink/runtime/state/GenericListState.java   |   6 +
 .../runtime/state/GenericReducingState.java     |   7 +
 .../state/filesystem/FsFoldingState.java        | 145 ++++++++++++++
 .../state/filesystem/FsStateBackend.java        |   7 +
 .../runtime/state/memory/MemFoldingState.java   | 118 +++++++++++
 .../state/memory/MemoryStateBackend.java        |   7 +
 .../runtime/state/StateBackendTestBase.java     | 105 ++++++++++
 .../api/datastream/AllWindowedStream.java       | 103 ++++++++--
 .../api/datastream/WindowedStream.java          | 198 +++++++++++++------
 .../windowing/FoldAllWindowFunction.java        |  92 ---------
 .../windowing/FoldApplyAllWindowFunction.java   |  95 +++++++++
 .../windowing/FoldApplyWindowFunction.java      |  95 +++++++++
 .../functions/windowing/FoldWindowFunction.java |  91 ---------
 .../windowing/PassThroughAllWindowFunction.java |  30 +++
 .../windowing/PassThroughWindowFunction.java    |  30 +++
 .../windowing/ReduceAllWindowFunction.java      |  30 ---
 .../windowing/ReduceWindowFunction.java         |  30 ---
 .../ReduceWindowFunctionWithWindow.java         |  31 ---
 .../operators/FoldApplyWindowFunctionTest.java  | 143 ++++++++++++++
 .../api/operators/FoldWindowFunctionTest.java   | 132 -------------
 .../operators/windowing/WindowOperatorTest.java |  10 +-
 .../runtime/state/StateBackendITCase.java       |   8 +
 .../streaming/api/scala/AllWindowedStream.scala |  60 ++++++
 .../streaming/api/scala/WindowedStream.scala    |  60 ++++++
 36 files changed, 1707 insertions(+), 496 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
new file mode 100644
index 0000000..7e4e573
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.rocksdb.RocksDBException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ReducingState} implementation that stores state in RocksDB.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the values that can be folded into the state.
+ * @param <ACC> The type of the value in the folding state.
+ * @param <Backend> The type of the backend that snapshots this key/value state.
+ */
+public class RocksDBFoldingState<K, N, T, ACC, Backend extends AbstractStateBackend>
+	extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend>
+	implements FoldingState<T, ACC> {
+
+	/** Serializer for the values */
+	private final TypeSerializer<ACC> valueSerializer;
+
+	/** This holds the name of the state and can create an initial default value for the state. */
+	protected final FoldingStateDescriptor<T, ACC> stateDesc;
+
+	/** User-specified fold function */
+	private final FoldFunction<T, ACC> foldFunction;
+
+	/**
+	 * Creates a new {@code RocksDBFoldingState}.
+	 *
+	 * @param keySerializer The serializer for the keys.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                     and can create a default state value.
+	 * @param dbPath The path on the local system where RocksDB data should be stored.
+	 * @param backupPath The path where to store backups.
+	 */
+	protected RocksDBFoldingState(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		FoldingStateDescriptor<T, ACC> stateDesc,
+		File dbPath,
+		String backupPath) {
+		super(keySerializer, namespaceSerializer, dbPath, backupPath);
+		this.stateDesc = requireNonNull(stateDesc);
+		this.valueSerializer = stateDesc.getSerializer();
+		this.foldFunction = stateDesc.getFoldFunction();
+	}
+
+	/**
+	 * Creates a {@code RocksDBFoldingState} by restoring from a directory.
+	 *
+	 * @param keySerializer The serializer for the keys.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                     and can create a default state value.
+	 * @param dbPath The path on the local system where RocksDB data should be stored.
+	 * @param backupPath The path where to store backups.
+	 * @param restorePath The path on the local file system that we are restoring from.
+	 */
+	protected RocksDBFoldingState(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		FoldingStateDescriptor<T, ACC> stateDesc,
+		File dbPath,
+		String backupPath,
+		String restorePath) {
+		super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath);
+		this.stateDesc = stateDesc;
+		this.valueSerializer = stateDesc.getSerializer();
+		this.foldFunction = stateDesc.getFoldFunction();
+	}
+
+	@Override
+	public ACC get() {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+		try {
+			writeKeyAndNamespace(out);
+			byte[] key = baos.toByteArray();
+			byte[] valueBytes = db.get(key);
+			if (valueBytes == null) {
+				return stateDesc.getDefaultValue();
+			}
+			return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+		} catch (IOException|RocksDBException e) {
+			throw new RuntimeException("Error while retrieving data from RocksDB", e);
+		}
+	}
+
+	@Override
+	public void add(T value) throws IOException {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+		try {
+			writeKeyAndNamespace(out);
+			byte[] key = baos.toByteArray();
+			byte[] valueBytes = db.get(key);
+
+			if (valueBytes == null) {
+				baos.reset();
+				valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out);
+				db.put(key, baos.toByteArray());
+			} else {
+				ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+				ACC newValue = foldFunction.fold(oldValue, value);
+				baos.reset();
+				valueSerializer.serialize(newValue, out);
+				db.put(key, baos.toByteArray());
+			}
+		} catch (Exception e) {
+			throw new RuntimeException("Error while adding data to RocksDB", e);
+		}
+	}
+
+	@Override
+	protected KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> createRocksDBSnapshot(
+		URI backupUri,
+		long checkpointId) {
+		return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
+	}
+
+	private static class Snapshot<K, N, T, ACC, Backend extends AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(File dbPath,
+			String checkpointPath,
+			URI backupUri,
+			long checkpointId,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			FoldingStateDescriptor<T, ACC> stateDesc) {
+			super(dbPath,
+				checkpointPath,
+				backupUri,
+				checkpointId,
+				keySerializer,
+				namespaceSerializer,
+				stateDesc);
+		}
+
+		@Override
+		protected KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> createRocksDBState(
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			FoldingStateDescriptor<T, ACC> stateDesc,
+			File dbPath,
+			String backupPath,
+			String restorePath) throws Exception {
+			return new RocksDBFoldingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath);
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index da07f75..6c55566 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -62,8 +62,9 @@ public class RocksDBListState<K, N, V>
 	 * @param keySerializer The serializer for the keys.
 	 * @param namespaceSerializer The serializer for the namespace.
 	 * @param stateDesc The state identifier for the state. This contains name
-	 *                           and can create a default state value.
+	 *                     and can create a default state value.
 	 * @param dbPath The path on the local system where RocksDB data should be stored.
+	 * @param backupPath The path where to store backups.
 	 */
 	protected RocksDBListState(TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer,
@@ -78,13 +79,15 @@ public class RocksDBListState<K, N, V>
 	}
 
 	/**
-	 * Creates a new {@code RocksDBListState}.
+	 * Creates a {@code RocksDBListState} by restoring from a directory.
 	 *
 	 * @param keySerializer The serializer for the keys.
 	 * @param namespaceSerializer The serializer for the namespace.
 	 * @param stateDesc The state identifier for the state. This contains name
-	 *                           and can create a default state value.
+	 *                     and can create a default state value.
 	 * @param dbPath The path on the local system where RocksDB data should be stored.
+	 * @param backupPath The path where to store backups.
+	 * @param restorePath The path on the local file system that we are restoring from.
 	 */
 	protected RocksDBListState(TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer,

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 81f9ffb..b7ba3c7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -63,8 +63,9 @@ public class RocksDBReducingState<K, N, V>
 	 * @param keySerializer The serializer for the keys.
 	 * @param namespaceSerializer The serializer for the namespace.
 	 * @param stateDesc The state identifier for the state. This contains name
-	 *                           and can create a default state value.
+	 *                     and can create a default state value.
 	 * @param dbPath The path on the local system where RocksDB data should be stored.
+	 * @param backupPath The path where to store backups.
 	 */
 	protected RocksDBReducingState(TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer,
@@ -79,6 +80,17 @@ public class RocksDBReducingState<K, N, V>
 		this.reduceFunction = stateDesc.getReduceFunction();
 	}
 
+	/**
+	 * Creates a {@code RocksDBReducingState} by restoring from a directory.
+	 *
+	 * @param keySerializer The serializer for the keys.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                     and can create a default state value.
+	 * @param dbPath The path on the local system where RocksDB data should be stored.
+	 * @param backupPath The path where to store backups.
+	 * @param restorePath The path on the local file system that we are restoring from.
+	 */
 	protected RocksDBReducingState(TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer,
 			ReducingStateDescriptor<V> stateDesc,

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 5b16e86..b323c5e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Random;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -242,7 +244,8 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	
 	@Override
 	protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer,
-		ValueStateDescriptor<T> stateDesc) throws Exception {
+			ValueStateDescriptor<T> stateDesc) throws Exception {
+
 		File dbPath = getDbPath(stateDesc.getName());
 		String checkpointPath = getCheckpointPath(stateDesc.getName());
 		
@@ -252,7 +255,8 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 
 	@Override
 	protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer,
-		ListStateDescriptor<T> stateDesc) throws Exception {
+			ListStateDescriptor<T> stateDesc) throws Exception {
+
 		File dbPath = getDbPath(stateDesc.getName());
 		String checkpointPath = getCheckpointPath(stateDesc.getName());
 		
@@ -262,7 +266,8 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 
 	@Override
 	protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer,
-		ReducingStateDescriptor<T> stateDesc) throws Exception {
+			ReducingStateDescriptor<T> stateDesc) throws Exception {
+
 		File dbPath = getDbPath(stateDesc.getName());
 		String checkpointPath = getCheckpointPath(stateDesc.getName());
 		
@@ -271,6 +276,15 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
+	protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer,
+			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+
+		File dbPath = getDbPath(stateDesc.getName());
+		String checkpointPath = getCheckpointPath(stateDesc.getName());
+		return new RocksDBFoldingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath);
+	}
+
+	@Override
 	public CheckpointStateOutputStream createCheckpointStateOutputStream(
 			long checkpointID, long timestamp) throws Exception {
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 388f099..7a19153 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -54,13 +54,14 @@ public class RocksDBValueState<K, N, V>
 	protected final ValueStateDescriptor<V> stateDesc;
 
 	/**
-	 * Creates a new {@code RocksDBReducingState}.
+	 * Creates a new {@code RocksDBValueState}.
 	 *
 	 * @param keySerializer The serializer for the keys.
 	 * @param namespaceSerializer The serializer for the namespace.
 	 * @param stateDesc The state identifier for the state. This contains name
 	 *                           and can create a default state value.
 	 * @param dbPath The path on the local system where RocksDB data should be stored.
+	 * @param backupPath The path where to store backups.
 	 */
 	protected RocksDBValueState(TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer,
@@ -74,6 +75,17 @@ public class RocksDBValueState<K, N, V>
 		this.valueSerializer = stateDesc.getSerializer();
 	}
 
+	/**
+	 * Creates a {@code RocksDBValueState} by restoring from a directory.
+	 *
+	 * @param keySerializer The serializer for the keys.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+	 * @param dbPath The path on the local system where RocksDB data should be stored.
+	 * @param backupPath The path where to store backups.
+	 * @param restorePath The path on the local file system that we are restoring from.
+	 */
 	protected RocksDBValueState(TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer,
 			ValueStateDescriptor<V> stateDesc,

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
index 5162983..d82bfb2 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -27,6 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.GenericFoldingState;
 import org.apache.flink.runtime.state.GenericListState;
 import org.apache.flink.runtime.state.GenericReducingState;
 import org.apache.flink.runtime.state.StateHandle;
@@ -241,6 +244,21 @@ public class DbStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
+	protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer,
+			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+
+		if (!stateDesc.isSerializerInitialized()) {
+			throw new IllegalArgumentException("state descriptor serializer not initialized");
+		}
+
+		ValueStateDescriptor<ACC> valueStateDescriptor = new ValueStateDescriptor<>(
+			stateDesc.getName(), stateDesc.getSerializer(), stateDesc.getDefaultValue());
+
+		ValueState<ACC> valueState = createValueState(namespaceSerializer, valueStateDescriptor);
+		return new GenericFoldingState<>(valueState, stateDesc.getFoldFunction());
+	}
+
+	@Override
 	public void initializeForJob(final Environment env,
 		String operatorIdentifier,
 		TypeSerializer<?> keySerializer) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
new file mode 100644
index 0000000..d328c2e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+/**
+ * {@link State} interface for folding state. Elements can be added to the state, they will
+ * be successively added to the initial value using a
+ * {@link org.apache.flink.api.common.functions.FoldFunction}. The current state can be inspected.
+ *
+ * <p>The state is accessed and modified by user functions, and checkpointed consistently
+ * by the system as part of the distributed snapshots.
+ * 
+ * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * automatically supplied by the system, so the function always sees the value mapped to the
+ * key of the current element. That way, the system can handle stream and state partitioning
+ * consistently together.
+ * 
+ * @param <T> Type of the values folded into the state
+ * @param <ACC> Type of the value in the state
+ */
+public interface FoldingState<T, ACC> extends MergingState<T, ACC> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
new file mode 100644
index 0000000..52ad712
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link StateDescriptor} for {@link FoldingState}. This can be used to create partitioned
+ * folding state.
+ *
+ * @param <T> Type of the values folded int othe state
+ * @param <ACC> Type of the value in the state
+ */
+public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState<T, ACC>, ACC> {
+	private static final long serialVersionUID = 1L;
+
+
+	private final FoldFunction<T, ACC> foldFunction;
+
+	/**
+	 * Creates a new {@code FoldingStateDescriptor} with the given name, type, and initial value.
+	 *
+	 * <p>If this constructor fails (because it is not possible to describe the type via a class),
+	 * consider using the {@link #FoldingStateDescriptor(String, ACC, FoldFunction, TypeInformation)} constructor.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param initialValue The initial value of the fold.
+	 * @param foldFunction The {@code FoldFunction} used to aggregate the state.
+	 * @param typeClass The type of the values in the state.
+	 */
+	public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction<T, ACC> foldFunction, Class<ACC> typeClass) {
+		super(name, typeClass, initialValue);
+		this.foldFunction = requireNonNull(foldFunction);
+
+		if (foldFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("FoldFunction of FoldingState can not be a RichFunction.");
+		}
+	}
+
+	/**
+	 * Creates a new {@code FoldingStateDescriptor} with the given name and default value.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param initialValue The initial value of the fold.
+	 * @param foldFunction The {@code FoldFunction} used to aggregate the state.
+	 * @param typeInfo The type of the values in the state.
+	 */
+	public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction<T, ACC> foldFunction, TypeInformation<ACC> typeInfo) {
+		super(name, typeInfo, initialValue);
+		this.foldFunction = requireNonNull(foldFunction);
+
+		if (foldFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("FoldFunction of FoldingState can not be a RichFunction.");
+		}
+	}
+
+	/**
+	 * Creates a new {@code ValueStateDescriptor} with the given name and default value.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param initialValue The initial value of the fold.
+	 * @param foldFunction The {@code FoldFunction} used to aggregate the state.
+	 * @param typeSerializer The type serializer of the values in the state.
+	 */
+	public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction<T, ACC> foldFunction, TypeSerializer<ACC> typeSerializer) {
+		super(name, typeSerializer, initialValue);
+		this.foldFunction = requireNonNull(foldFunction);
+
+		if (foldFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("FoldFunction of FoldingState can not be a RichFunction.");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public FoldingState<T, ACC> bind(StateBackend stateBackend) throws Exception {
+		return stateBackend.createFoldingState(this);
+	}
+
+	/**
+	 * Returns the fold function to be used for the folding state.
+	 */
+	public FoldFunction<T, ACC> getFoldFunction() {
+		return foldFunction;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
index 8c7c608..a61433a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
@@ -48,4 +48,12 @@ public interface StateBackend {
 	 */
 	<T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception;
 
+	/**
+	 * Creates and returns a new {@link FoldingState}.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <T> Type of the values folded into the state
+	 * @param <ACC> Type of the value in the state
+	 */
+	<T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 38087fc..4cf2371 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -229,7 +229,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	@Override
 	public String toString() {
 		return getClass().getSimpleName() + 
-				"{ name=" + name +
+				"{name=" + name +
 				", defaultValue=" + defaultValue +
 				", serializer=" + serializer +
 				'}';

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index d0c4f82..beccd86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -148,6 +150,18 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 	protected abstract <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception;
 
 	/**
+	 * Creates and returns a new {@link FoldingState}.
+	 *
+	 * @param namespaceSerializer TypeSerializer for the state namespace.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <N> The type of the namespace.
+	 * @param <T> Type of the values folded into the state
+	 * @param <ACC> Type of the value in the state	 *
+	 */
+	abstract protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
+
+	/**
 	 * Sets the current key that is used for partitioned state.
 	 * @param currentKey The current key.
 	 */
@@ -223,6 +237,12 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 			public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
 				return AbstractStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
 			}
+
+			@Override
+			public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+				return AbstractStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
+			}
+
 		});
 
 		keyValueStatesByName.put(stateDescriptor.getName(), (KvState) kvstate);

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
new file mode 100644
index 0000000..ef1d796
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Generic implementation of {@link FoldingState} based on a wrapped {@link ValueState}.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the values that can be folded into the state.
+ * @param <ACC> The type of the value in the folding state.
+ * @param <Backend> The type of {@link AbstractStateBackend} that manages this {@code KvState}.
+ * @param <W> Generic type that extends both the underlying {@code ValueState} and {@code KvState}.
+ */
+public class GenericFoldingState<K, N, T, ACC, Backend extends AbstractStateBackend, W extends ValueState<ACC> & KvState<K, N, ValueState<ACC>, ValueStateDescriptor<ACC>, Backend>>
+	implements FoldingState<T, ACC>, KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> {
+
+	private final W wrappedState;
+	private final FoldFunction<T, ACC> foldFunction;
+
+	/**
+	 * Creates a new {@code FoldingState} that wraps the given {@link ValueState}. The
+	 * {@code ValueState} must have the initial value of the fold as default value.
+	 *
+	 * @param wrappedState The wrapped {@code ValueState}
+	 * @param foldFunction The {@code FoldFunction} to use for folding values into the state
+	 */
+	@SuppressWarnings("unchecked")
+	public GenericFoldingState(ValueState<ACC> wrappedState, FoldFunction<T, ACC> foldFunction) {
+		if (!(wrappedState instanceof KvState)) {
+			throw new IllegalArgumentException("Wrapped state must be a KvState.");
+		}
+		this.wrappedState = (W) wrappedState;
+		this.foldFunction = foldFunction;
+	}
+
+	@Override
+	public void setCurrentKey(K key) {
+		wrappedState.setCurrentKey(key);
+	}
+
+	@Override
+	public void setCurrentNamespace(N namespace) {
+		wrappedState.setCurrentNamespace(namespace);
+	}
+
+	@Override
+	public KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> snapshot(
+		long checkpointId,
+		long timestamp) throws Exception {
+		KvStateSnapshot<K, N, ValueState<ACC>, ValueStateDescriptor<ACC>, Backend> wrappedSnapshot = wrappedState.snapshot(
+			checkpointId,
+			timestamp);
+		return new Snapshot<>(wrappedSnapshot, foldFunction);
+	}
+
+	@Override
+	public void dispose() {
+		wrappedState.dispose();
+	}
+
+	@Override
+	public ACC get() throws Exception {
+		return wrappedState.value();
+	}
+
+	@Override
+	public void add(T value) throws Exception {
+		ACC currentValue = wrappedState.value();
+		wrappedState.update(foldFunction.fold(currentValue, value));
+	}
+
+	@Override
+	public void clear() {
+		wrappedState.clear();
+	}
+
+	private static class Snapshot<K, N, T, ACC, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> {
+		private static final long serialVersionUID = 1L;
+
+		private final KvStateSnapshot<K, N, ValueState<ACC>, ValueStateDescriptor<ACC>, Backend> wrappedSnapshot;
+
+		private final FoldFunction<T, ACC> foldFunction;
+
+		public Snapshot(KvStateSnapshot<K, N, ValueState<ACC>, ValueStateDescriptor<ACC>, Backend> wrappedSnapshot,
+			FoldFunction<T, ACC> foldFunction) {
+			this.wrappedSnapshot = wrappedSnapshot;
+			this.foldFunction = foldFunction;
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> restoreState(
+			Backend stateBackend,
+			TypeSerializer<K> keySerializer,
+			ClassLoader classLoader,
+			long recoveryTimestamp) throws Exception {
+			return new GenericFoldingState((ValueState<ACC>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, recoveryTimestamp), foldFunction);
+		}
+
+		@Override
+		public void discardState() throws Exception {
+			wrappedSnapshot.discardState();
+		}
+
+		@Override
+		public long getStateSize() throws Exception {
+			return wrappedSnapshot.getStateSize();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
index c20962f..fbb0170 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
@@ -40,6 +40,12 @@ public class GenericListState<K, N, T, Backend extends AbstractStateBackend, W e
 
 	private final W wrappedState;
 
+	/**
+	 * Creates a new {@code ListState} that wraps the given {@link ValueState}. The
+	 * {@code ValueState} must have a default value of {@code null}.
+	 *
+	 * @param wrappedState The wrapped {@code ValueState}
+	 */
 	@SuppressWarnings("unchecked")
 	public GenericListState(ValueState<ArrayList<T>> wrappedState) {
 		if (!(wrappedState instanceof KvState)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
index 1181c66..102e25e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
@@ -39,6 +39,13 @@ public class GenericReducingState<K, N, T, Backend extends AbstractStateBackend,
 	private final W wrappedState;
 	private final ReduceFunction<T> reduceFunction;
 
+	/**
+	 * Creates a new {@code ReducingState} that wraps the given {@link ValueState}. The
+	 * {@code ValueState} must have a default value of {@code null}.
+	 *
+	 * @param wrappedState The wrapped {@code ValueState}
+	 * @param reduceFunction The {@code ReduceFunction} to use for combining values.
+	 */
 	@SuppressWarnings("unchecked")
 	public GenericReducingState(ValueState<T> wrappedState, ReduceFunction<T> reduceFunction) {
 		if (!(wrappedState instanceof KvState)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
new file mode 100644
index 0000000..bba6df5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Heap-backed partitioned {@link FoldingState} that is
+ * snapshotted into files.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the values that can be folded into the state.
+ * @param <ACC> The type of the value in the folding state.
+ */
+public class FsFoldingState<K, N, T, ACC>
+	extends AbstractFsState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
+	implements FoldingState<T, ACC> {
+
+	private final FoldFunction<T, ACC> foldFunction;
+
+	/**
+	 * Creates a new and empty partitioned state.
+	 *
+	 * @param backend The file system state backend backing snapshots of this state
+	 * @param keySerializer The serializer for the key.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+	 */
+	public FsFoldingState(FsStateBackend backend,
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		FoldingStateDescriptor<T, ACC> stateDesc) {
+		super(backend, keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc);
+		this.foldFunction = stateDesc.getFoldFunction();
+	}
+
+	/**
+	 * Creates a new key/value state with the given state contents.
+	 * This method is used to re-create key/value state with existing data, for example from
+	 * a snapshot.
+	 *
+	 * @param backend The file system state backend backing snapshots of this state
+	 * @param keySerializer The serializer for the key.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateDesc The state identifier for the state. This contains name
+*                           and can create a default state value.
+	 * @param state The map of key/value pairs to initialize the state with.
+	 */
+	public FsFoldingState(FsStateBackend backend,
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		FoldingStateDescriptor<T, ACC> stateDesc,
+		HashMap<N, Map<K, ACC>> state) {
+		super(backend, keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc, state);
+		this.foldFunction = stateDesc.getFoldFunction();
+	}
+
+	@Override
+	public ACC get() {
+		if (currentNSState == null) {
+			currentNSState = state.get(currentNamespace);
+		}
+		if (currentNSState != null) {
+			ACC value = currentNSState.get(currentKey);
+			return value != null ? value : stateDesc.getDefaultValue();
+		}
+		return stateDesc.getDefaultValue();
+	}
+
+	@Override
+	public void add(T value) throws IOException {
+		if (currentKey == null) {
+			throw new RuntimeException("No key available.");
+		}
+
+		if (currentNSState == null) {
+			currentNSState = new HashMap<>();
+			state.put(currentNamespace, currentNSState);
+		}
+
+		ACC currentValue = currentNSState.get(currentKey);
+		try {
+			if (currentValue == null) {
+				currentNSState.put(currentKey, foldFunction.fold(stateDesc.getDefaultValue(), value));
+			} else {
+				currentNSState.put(currentKey, foldFunction.fold(currentValue, value));
+
+			}
+		} catch (Exception e) {
+			throw new RuntimeException("Could not add value to folding state.", e);
+		}
+	}
+
+	@Override
+	public KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, FsStateBackend> createHeapSnapshot(Path filePath) {
+		return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, filePath);
+	}
+
+
+	public static class Snapshot<K, N, T, ACC> extends AbstractFsStateSnapshot<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<ACC> stateSerializer,
+			FoldingStateDescriptor<T, ACC> stateDescs,
+			Path filePath) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath);
+		}
+
+		@Override
+		public KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, ACC>> stateMap) {
+			return new FsFoldingState<>(backend, keySerializer, namespaceSerializer, stateDesc, stateMap);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 37c1392..77d540b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -287,6 +289,11 @@ public class FsStateBackend extends AbstractStateBackend {
 		return new FsReducingState<>(this, keySerializer, namespaceSerializer, stateDesc);
 	}
 
+	@Override
+	protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer,
+		FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+		return new FsFoldingState<>(this, keySerializer, namespaceSerializer, stateDesc);
+	}
 
 	@Override
 	public <S extends Serializable> StateHandle<S> checkpointStateSerializable(

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
new file mode 100644
index 0000000..07b677b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Heap-backed partitioned {@link FoldingState} that is
+ * snapshotted into a serialized memory copy.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the values that can be folded into the state.
+ * @param <ACC> The type of the value in the folding state.
+ */
+public class MemFoldingState<K, N, T, ACC>
+	extends AbstractMemState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
+	implements FoldingState<T, ACC> {
+
+	private final FoldFunction<T, ACC> foldFunction;
+
+	public MemFoldingState(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		FoldingStateDescriptor<T, ACC> stateDesc) {
+		super(keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc);
+		this.foldFunction = stateDesc.getFoldFunction();
+	}
+
+	public MemFoldingState(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		FoldingStateDescriptor<T, ACC> stateDesc,
+		HashMap<N, Map<K, ACC>> state) {
+		super(keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc, state);
+		this.foldFunction = stateDesc.getFoldFunction();
+	}
+
+	@Override
+	public ACC get() {
+		if (currentNSState == null) {
+			currentNSState = state.get(currentNamespace);
+		}
+		if (currentNSState != null) {
+			ACC value = currentNSState.get(currentKey);
+			return value != null ? value : stateDesc.getDefaultValue();
+		}
+		return stateDesc.getDefaultValue();
+	}
+
+	@Override
+	public void add(T value) throws IOException {
+		if (currentKey == null) {
+			throw new RuntimeException("No key available.");
+		}
+
+		if (currentNSState == null) {
+			currentNSState = new HashMap<>();
+			state.put(currentNamespace, currentNSState);
+		}
+
+		ACC currentValue = currentNSState.get(currentKey);
+		try {
+			if (currentValue == null) {
+				currentNSState.put(currentKey, foldFunction.fold(stateDesc.getDefaultValue(), value));
+			} else {
+					currentNSState.put(currentKey, foldFunction.fold(currentValue, value));
+
+			}
+		} catch (Exception e) {
+			throw new RuntimeException("Could not add value to folding state.", e);
+		}
+	}
+
+	@Override
+	public KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, MemoryStateBackend> createHeapSnapshot(byte[] bytes) {
+		return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, bytes);
+	}
+
+	public static class Snapshot<K, N, T, ACC> extends AbstractMemStateSnapshot<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<ACC> stateSerializer,
+			FoldingStateDescriptor<T, ACC> stateDescs, byte[] data) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, data);
+		}
+
+		@Override
+		public KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, MemoryStateBackend> createMemState(HashMap<N, Map<K, ACC>> stateMap) {
+			return new MemFoldingState<>(keySerializer, namespaceSerializer, stateDesc, stateMap);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 2b7b5f1..7b9d21b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state.memory;
 
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -97,6 +99,11 @@ public class MemoryStateBackend extends AbstractStateBackend {
 		return new MemReducingState<>(keySerializer, namespaceSerializer, stateDesc);
 	}
 
+	@Override
+	public <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+		return new MemFoldingState<>(keySerializer, namespaceSerializer, stateDesc);
+	}
+
 	/**
 	 * Serialized the given state into bytes using Java serialization and creates a state handle that
 	 * can re-create that state.

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 6083bd6..27dee6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -22,7 +22,10 @@ import com.google.common.base.Joiner;
 
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -410,6 +413,108 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		}
 	}
 
+	@Test
+	@SuppressWarnings("unchecked,rawtypes")
+	public void testFoldingState() {
+		try {
+			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
+
+			FoldingStateDescriptor<Integer, String> kvId = new FoldingStateDescriptor<>("id",
+				"Fold-Initial:",
+				new FoldFunction<Integer, String>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public String fold(String acc, Integer value) throws Exception {
+						return acc + "," + value;
+					}
+				},
+				String.class);
+			FoldingState<Integer, String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+
+			@SuppressWarnings("unchecked")
+			KvState<Integer, Void, FoldingState<Integer, String>, FoldingStateDescriptor<Integer, String>, B> kv =
+				(KvState<Integer, Void, FoldingState<Integer, String>, FoldingStateDescriptor<Integer, String>, B>) state;
+
+			Joiner joiner = Joiner.on(",");
+			// some modifications to the state
+			kv.setCurrentKey(1);
+			assertEquals("Fold-Initial:", state.get());
+			state.add(1);
+			kv.setCurrentKey(2);
+			assertEquals("Fold-Initial:", state.get());
+			state.add(2);
+			kv.setCurrentKey(1);
+			assertEquals("Fold-Initial:,1", state.get());
+
+			// draw a snapshot
+			KvStateSnapshot<Integer, Void, FoldingState<Integer, String>, FoldingStateDescriptor<Integer, String>, B> snapshot1 =
+				kv.snapshot(682375462378L, 2);
+
+			// make some more modifications
+			kv.setCurrentKey(1);
+			state.clear();
+			state.add(101);
+			kv.setCurrentKey(2);
+			state.add(102);
+			kv.setCurrentKey(3);
+			state.add(103);
+
+			// draw another snapshot
+			KvStateSnapshot<Integer, Void, FoldingState<Integer, String>, FoldingStateDescriptor<Integer, String>, B> snapshot2 =
+				kv.snapshot(682375462379L, 4);
+
+			// validate the original state
+			kv.setCurrentKey(1);
+			assertEquals("Fold-Initial:,101", state.get());
+			kv.setCurrentKey(2);
+			assertEquals("Fold-Initial:,2,102", state.get());
+			kv.setCurrentKey(3);
+			assertEquals("Fold-Initial:,103", state.get());
+
+			kv.dispose();
+
+			// restore the first snapshot and validate it
+			KvState<Integer, Void, FoldingState<Integer, String>, FoldingStateDescriptor<Integer, String>, B> restored1 = snapshot1.restoreState(
+				backend,
+				IntSerializer.INSTANCE,
+				this.getClass().getClassLoader(), 10);
+
+			snapshot1.discardState();
+
+			@SuppressWarnings("unchecked")
+			FoldingState<Integer, String> restored1State = (FoldingState<Integer, String>) restored1;
+
+			restored1.setCurrentKey(1);
+			assertEquals("Fold-Initial:,1", restored1State.get());
+			restored1.setCurrentKey(2);
+			assertEquals("Fold-Initial:,2", restored1State.get());
+
+			restored1.dispose();
+
+			// restore the second snapshot and validate it
+			KvState<Integer, Void, FoldingState<Integer, String>, FoldingStateDescriptor<Integer, String>, B> restored2 = snapshot2.restoreState(
+				backend,
+				IntSerializer.INSTANCE,
+				this.getClass().getClassLoader(), 20);
+
+			snapshot2.discardState();
+
+			@SuppressWarnings("unchecked")
+			FoldingState<Integer, String> restored2State = (FoldingState<Integer, String>) restored2;
+
+			restored2.setCurrentKey(1);
+			assertEquals("Fold-Initial:,101", restored2State.get());
+			restored2.setCurrentKey(2);
+			assertEquals("Fold-Initial:,2,102", restored2State.get());
+			restored2.setCurrentKey(3);
+			assertEquals("Fold-Initial:,103", restored2State.get());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 
 	@Test
 	public void testValueStateRestoreWithWrongSerializers() {

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 2902795..d8da998 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -33,8 +33,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.functions.windowing.FoldAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.FoldApplyAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -144,7 +145,7 @@ public class AllWindowedStream<T, W extends Window> {
 		function = input.getExecutionEnvironment().clean(function);
 
 		String callLocation = Utils.getCallLocationName();
-		String udfName = "Reduce at " + callLocation;
+		String udfName = "AllWindowedStream." + callLocation;
 
 		SingleOutputStreamOperator<T, ?> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
 		if (result != null) {
@@ -185,13 +186,15 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @return The data stream that is the result of applying the fold function to the window.
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function) {
-		//clean the closure
-		function = input.getExecutionEnvironment().clean(function);
+		if (function instanceof RichFunction) {
+			throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
+				"Please use apply(FoldFunction, WindowFunction) instead.");
+		}
 
 		TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
-				Utils.getCallLocationName(), true);
+			Utils.getCallLocationName(), true);
 
-		return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType);
+		return fold(initialValue, function, resultType);
 	}
 
 	/**
@@ -203,9 +206,12 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @return The data stream that is the result of applying the fold function to the window.
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
-		//clean the closure
-		function = input.getExecutionEnvironment().clean(function);
-		return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType);
+		if (function instanceof RichFunction) {
+			throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
+				"Please use apply(FoldFunction, WindowFunction) instead.");
+		}
+
+		return apply(initialValue, function, new PassThroughAllWindowFunction<W, R>(), resultType);
 	}
 
 	/**
@@ -244,7 +250,7 @@ public class AllWindowedStream<T, W extends Window> {
 		function = input.getExecutionEnvironment().clean(function);
 
 		String callLocation = Utils.getCallLocationName();
-		String udfName = "WindowApply at " + callLocation;
+		String udfName = "AllWindowedStream." + callLocation;
 
 		SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName);
 		if (result != null) {
@@ -321,7 +327,7 @@ public class AllWindowedStream<T, W extends Window> {
 		preAggregator = input.getExecutionEnvironment().clean(preAggregator);
 
 		String callLocation = Utils.getCallLocationName();
-		String udfName = "WindowApply at " + callLocation;
+		String udfName = "AllWindowedStream." + callLocation;
 
 		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
 
@@ -348,6 +354,81 @@ public class AllWindowedStream<T, W extends Window> {
 		return input.transform(opName, resultType, operator).setParallelism(1);
 	}
 
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is incrementally aggregated using the given fold function.
+	 *
+	 * @param initialValue The initial value of the fold.
+	 * @param foldFunction The fold function that is used for incremental aggregation.
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function) {
+		TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
+			Utils.getCallLocationName(), true);
+
+		return apply(initialValue, foldFunction, function, resultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is incrementally aggregated using the given fold function.
+	 *
+	 * @param initialValue The initial value of the fold.
+	 * @param foldFunction The fold function that is used for incremental aggregation.
+	 * @param function The window function.
+	 * @param resultType Type information for the result type of the window function
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function, TypeInformation<R> resultType) {
+		if (foldFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
+		}
+
+		//clean the closures
+		function = input.getExecutionEnvironment().clean(function);
+		foldFunction = input.getExecutionEnvironment().clean(foldFunction);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "AllWindowedStream." + callLocation;
+
+		String opName;
+
+		OneInputStreamOperator<T, R> operator;
+
+		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
+		if (evictor != null) {
+			opName = "NonParallelTriggerWindow(" + windowAssigner  + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+				windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+				new HeapWindowBuffer.Factory<T>(),
+				new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function),
+				trigger,
+				evictor).enableSetProcessingTime(setProcessingTime);
+
+		} else {
+			opName = "NonParallelTriggerWindow(" + windowAssigner  + ", " + trigger + ", " + udfName + ")";
+
+			operator = new NonKeyedWindowOperator<>(windowAssigner,
+				windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+				new HeapWindowBuffer.Factory<T>(),
+				new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function),
+				trigger).enableSetProcessingTime(setProcessingTime);
+		}
+
+		return input.transform(opName, resultType, operator).setParallelism(1);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Aggregations on the  windows
 	// ------------------------------------------------------------------------


Mime
View raw message