flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/3] flink git commit: [FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction
Date Sat, 25 Mar 2017 16:01:37 GMT
[FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fad201bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fad201bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fad201bf

Branch: refs/heads/master
Commit: fad201bfb0b1f2757f68f7b3ffaf97a486eb93e8
Parents: 5c37e55
Author: Seth Wiesman <swiesman@mediamath.com>
Authored: Sun Mar 5 23:07:18 2017 -0500
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Sat Mar 25 16:59:17 2017 +0100

----------------------------------------------------------------------
 .../FoldApplyProcessAllWindowFunction.java      |  23 +-
 .../FoldApplyProcessWindowFunction.java         |  23 +-
 .../InternalProcessApplyAllWindowContext.java   |  57 +++++
 .../InternalProcessApplyWindowContext.java      |  58 +++++
 .../windowing/ProcessAllWindowFunction.java     |  22 ++
 .../windowing/ProcessWindowFunction.java        |  24 +-
 .../ReduceApplyProcessAllWindowFunction.java    |  23 +-
 .../ReduceApplyProcessWindowFunction.java       |  21 +-
 .../windowing/AccumulatingKeyedTimePanes.java   |  75 ++++++-
 .../windowing/EvictingWindowOperator.java       |  62 +++---
 .../operators/windowing/WindowOperator.java     | 220 +++++++++++++++----
 ...ternalAggregateProcessAllWindowFunction.java |  28 ++-
 .../InternalAggregateProcessWindowFunction.java |  28 ++-
 .../InternalIterableAllWindowFunction.java      |   7 +-
 ...nternalIterableProcessAllWindowFunction.java |  31 ++-
 .../InternalIterableProcessWindowFunction.java  |  24 +-
 .../InternalIterableWindowFunction.java         |   7 +-
 .../InternalProcessAllWindowContext.java        |  57 +++++
 .../functions/InternalProcessWindowContext.java |  58 +++++
 .../InternalSingleValueAllWindowFunction.java   |   7 +-
 ...rnalSingleValueProcessAllWindowFunction.java |  29 ++-
 ...nternalSingleValueProcessWindowFunction.java |  24 +-
 .../InternalSingleValueWindowFunction.java      |   7 +-
 .../functions/InternalWindowFunction.java       |  26 ++-
 .../FoldApplyProcessWindowFunctionTest.java     |  82 ++++++-
 .../functions/InternalWindowFunctionTest.java   |  49 +++--
 .../RegularWindowOperatorContractTest.java      |  12 +-
 .../windowing/WindowOperatorContractTest.java   | 158 ++++++++++---
 .../function/ProcessAllWindowFunction.scala     |  20 ++
 .../scala/function/ProcessWindowFunction.scala  |  20 ++
 .../ScalaProcessWindowFunctionWrapper.scala     |  31 +++
 31 files changed, 1091 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
index 5ac6766..8e8e52c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -50,6 +50,7 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
 	private TypeSerializer<ACC> accSerializer;
 	private final TypeInformation<ACC> accTypeInformation;
 	private transient ACC initialValue;
+	private transient InternalProcessApplyAllWindowContext<ACC, R, W> ctx;
 
 	public FoldApplyProcessAllWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
 		this.windowFunction = windowFunction;
@@ -70,6 +71,9 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
 		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
 		DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
 		initialValue = accSerializer.deserialize(in);
+
+		ctx = new InternalProcessApplyAllWindowContext<>(windowFunction);
+
 	}
 
 	@Override
@@ -92,12 +96,19 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
 			result = foldFunction.fold(result, val);
 		}
 
-		windowFunction.process(windowFunction.new Context() {
-			@Override
-			public W window() {
-				return context.window();
-			}
-		}, Collections.singletonList(result), out);
+		this.ctx.window = context.window();
+		this.ctx.windowState = context.windowState();
+		this.ctx.globalState = context.globalState();
+
+		windowFunction.process(ctx, Collections.singletonList(result), out);
+	}
+
+	@Override
+	public void clear(final Context context) throws Exception {
+		this.ctx.window = context.window();
+		this.ctx.windowState = context.windowState();
+		this.ctx.globalState = context.globalState();
+		windowFunction.clear(ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
index e1bc759..073a2f3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
@@ -50,6 +50,7 @@ public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
 	private TypeSerializer<ACC> accSerializer;
 	private final TypeInformation<ACC> accTypeInformation;
 	private transient ACC initialValue;
+	private transient InternalProcessApplyWindowContext<ACC, R, K, W> ctx;
 
 	public FoldApplyProcessWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
 		this.windowFunction = windowFunction;
@@ -70,6 +71,8 @@ public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
 		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
 		DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
 		initialValue = accSerializer.deserialize(in);
+
+		ctx = new InternalProcessApplyWindowContext<>(windowFunction);
 	}
 
 	@Override
@@ -85,19 +88,25 @@ public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
 	}
 
 	@Override
-	public void process(K key, final Context context, Iterable<T> values, Collector<R> out) throws Exception {
+	public void process(K key, Context context, Iterable<T> values, Collector<R> out) throws Exception {
 		ACC result = accSerializer.copy(initialValue);
 
 		for (T val : values) {
 			result = foldFunction.fold(result, val);
 		}
 
-		windowFunction.process(key, windowFunction.new Context() {
-			@Override
-			public W window() {
-				return context.window();
-			}
-		}, Collections.singletonList(result), out);
+		this.ctx.window = context.window();
+		this.ctx.windowState = context.windowState();
+		this.ctx.globalState = context.globalState();
+		windowFunction.process(key, ctx, Collections.singletonList(result), out);
+	}
+
+	@Override
+	public void clear(final Context context) throws Exception{
+		this.ctx.window = context.window();
+		this.ctx.windowState = context.windowState();
+		this.ctx.globalState = context.globalState();
+		windowFunction.clear(ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
new file mode 100644
index 0000000..e1a0a98
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessApplyAllWindowContext<IN, OUT, W extends Window>
+	extends ProcessAllWindowFunction<IN, OUT, W>.Context {
+
+	W window;
+	KeyedStateStore windowState;
+	KeyedStateStore globalState;
+
+	InternalProcessApplyAllWindowContext(ProcessAllWindowFunction<IN, OUT, W> function) {
+		function.super();
+	}
+
+	@Override
+	public W window() {
+		return window;
+	}
+
+	@Override
+	public KeyedStateStore windowState() {
+		return windowState;
+	}
+
+	@Override
+	public KeyedStateStore globalState() {
+		return globalState;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
new file mode 100644
index 0000000..f547adc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the window key. 
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessApplyWindowContext<IN, OUT, KEY, W extends Window>
+	extends ProcessWindowFunction<IN, OUT, KEY, W>.Context {
+
+	W window;
+	KeyedStateStore windowState;
+	KeyedStateStore globalState;
+
+	InternalProcessApplyWindowContext(ProcessWindowFunction<IN, OUT, KEY, W> function) {
+		function.super();
+	}
+
+	@Override
+	public W window() {
+		return window;
+	}
+
+	@Override
+	public KeyedStateStore windowState() {
+		return windowState;
+	}
+
+	@Override
+	public KeyedStateStore globalState() {
+		return globalState;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
index 622e020..f49aa27 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -48,6 +49,14 @@ public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implem
 	public abstract void process(Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
 
 	/**
+	 * Deletes any state in the {@code Context} when the Window is purged.
+	 *
+	 * @param context The context to which the window is being evaluated
+	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+	 */
+	public void clear(Context context) throws Exception {}
+
+	/**
 	 * The context holding window metadata
 	 */
 	public abstract class Context {
@@ -55,5 +64,18 @@ public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implem
 		 * @return The window that is being evaluated.
 		 */
 		public abstract W window();
+
+		/**
+		 * State accessor for per-key and per-window state.
+		 *
+		 * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
+		 * by implementing {@link ProcessWindowFunction#clear(ProcessWindowFunction.Context)}.
+		 */
+		public abstract KeyedStateStore windowState();
+
+		/**
+		 * State accessor for per-key global state.
+		 */
+		public abstract KeyedStateStore globalState();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
index 9c48e24..bcefaf7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -50,12 +51,33 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> impl
 	public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
 
 	/**
+	 * Deletes any state in the {@code Context} when the Window is purged.
+	 *
+	 * @param context The context to which the window is being evaluated
+	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+	 */
+	public void clear(Context context) throws Exception {}
+
+	/**
 	 * The context holding window metadata
 	 */
-	public abstract class Context {
+	public abstract class Context implements java.io.Serializable {
 		/**
 		 * @return The window that is being evaluated.
 		 */
 		public abstract W window();
+
+		/**
+		 * State accessor for per-key and per-window state.
+		 *
+		 * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
+		 * by implementing {@link ProcessWindowFunction#clear(Context)}.
+		 */
+		public abstract KeyedStateStore windowState();
+
+		/**
+		 * State accessor for per-key global state.
+		 */
+		public abstract KeyedStateStore globalState();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
index 142c71e..4c54c94 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
@@ -35,6 +35,7 @@ public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R>
 
 	private final ReduceFunction<T> reduceFunction;
 	private final ProcessAllWindowFunction<T, R, W> windowFunction;
+	private transient InternalProcessApplyAllWindowContext<T, R, W> ctx;
 
 	public ReduceApplyProcessAllWindowFunction(ReduceFunction<T> reduceFunction, ProcessAllWindowFunction<T, R, W> windowFunction) {
 		this.windowFunction = windowFunction;
@@ -52,17 +53,27 @@ public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R>
 				curr = reduceFunction.reduce(curr, val);
 			}
 		}
-		windowFunction.process(windowFunction.new Context() {
-			@Override
-			public W window() {
-				return context.window();
-			}
-		}, Collections.singletonList(curr), out);
+
+		this.ctx.window = context.window();
+		this.ctx.windowState = context.windowState();
+		this.ctx.globalState = context.globalState();
+
+		windowFunction.process(ctx, Collections.singletonList(curr), out);
+	}
+
+	@Override
+	public void clear(final Context context) throws Exception {
+		this.ctx.window = context.window();
+		this.ctx.windowState = context.windowState();
+		this.ctx.globalState = context.globalState();
+
+		windowFunction.clear(ctx);
 	}
 
 	@Override
 	public void open(Configuration configuration) throws Exception {
 		FunctionUtils.openFunction(this.windowFunction, configuration);
+		ctx = new InternalProcessApplyAllWindowContext<>(windowFunction);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
index 9ea1fdf..1af783a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
@@ -35,6 +35,7 @@ public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R>
 
 	private final ReduceFunction<T> reduceFunction;
 	private final ProcessWindowFunction<T, R, K, W> windowFunction;
+	private transient InternalProcessApplyWindowContext<T, R, K, W> ctx;
 
 	public ReduceApplyProcessWindowFunction(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> windowFunction) {
 		this.windowFunction = windowFunction;
@@ -52,17 +53,25 @@ public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R>
 				curr = reduceFunction.reduce(curr, val);
 			}
 		}
-		windowFunction.process(k, windowFunction.new Context() {
-			@Override
-			public W window() {
-				return context.window();
-			}
-		}, Collections.singletonList(curr), out);
+
+		this.ctx.window = context.window();
+		this.ctx.windowState = context.windowState();
+		this.ctx.globalState = context.globalState();
+		windowFunction.process(k, ctx, Collections.singletonList(curr), out);
+	}
+
+	@Override
+	public void clear(final Context context) throws Exception {
+		this.ctx.window = context.window();
+		this.ctx.windowState = context.windowState();
+		this.ctx.globalState = context.globalState();
+		windowFunction.clear(ctx);
 	}
 
 	@Override
 	public void open(Configuration configuration) throws Exception {
 		FunctionUtils.openFunction(this.windowFunction, configuration);
+		ctx = new InternalProcessApplyWindowContext<>(windowFunction);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index 87c5aca..d58b5cc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -19,6 +19,17 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.util.UnionIterator;
@@ -38,6 +49,8 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 
 	private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function;
 
+	private final AccumulatingKeyedTimePanesContext context;
+
 	/**
 	 * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
 	private long evaluationPass = 1L;   
@@ -47,6 +60,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, InternalWindowFunction<Iterable<Type>, Result, Key, Window> function) {
 		this.keySelector = keySelector;
 		this.function = function;
+		this.context = new AccumulatingKeyedTimePanesContext();
 	}
 
 	// ------------------------------------------------------------------------
@@ -67,13 +81,15 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 			for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
 				Key key = entry.getKey();
 				operator.setCurrentKey(key);
-				function.apply(entry.getKey(), window, entry.getValue(), out);
+				context.globalState = operator.getKeyedStateStore();
+
+				function.process(entry.getKey(), window, context, entry.getValue(), out);
 			}
 		}
 		else {
 			// general code path for multi-pane case
 			WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(
-					function, window, out, operator);
+					function, window, out, operator, context);
 			traverseAllPanes(evaluator, evaluationPass);
 		}
 		
@@ -95,17 +111,19 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 		private final TimeWindow window;
 		
 		private final AbstractStreamOperator<Result> contextOperator;
-		
+
 		private Key currentKey;
 		
+		private AccumulatingKeyedTimePanesContext context;
 
 		WindowFunctionTraversal(InternalWindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window,
-								Collector<Result> out, AbstractStreamOperator<Result> contextOperator) {
+								Collector<Result> out, AbstractStreamOperator<Result> contextOperator, AccumulatingKeyedTimePanesContext context) {
 			this.function = function;
 			this.out = out;
 			this.unionIterator = new UnionIterator<>();
 			this.window = window;
 			this.contextOperator = contextOperator;
+			this.context = context;
 		}
 
 
@@ -123,7 +141,8 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 		@Override
 		public void keyDone() throws Exception {
 			contextOperator.setCurrentKey(currentKey);
-			function.apply(currentKey, window, unionIterator, out);
+			context.globalState = contextOperator.getKeyedStateStore();
+			function.process(currentKey, window, context, unionIterator, out);
 		}
 	}
 	
@@ -136,6 +155,52 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 		return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
 	}
 
+	private static class ThrowingKeyedStateStore implements KeyedStateStore {
+		@Override
+		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+		}
+
+		@Override
+		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+		}
+
+		@Override
+		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+		}
+
+		@Override
+		public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+		}
+
+		@Override
+		public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+		}
+	}
+
+	private static class AccumulatingKeyedTimePanesContext implements InternalWindowFunction.InternalWindowContext {
+		KeyedStateStore globalState;
+		KeyedStateStore throwingStore;
+
+		public AccumulatingKeyedTimePanesContext() {
+			this.throwingStore = new ThrowingKeyedStateStore();
+		}
+
+		@Override
+		public KeyedStateStore windowState() {
+			return throwingStore;
+		}
+
+		@Override
+		public KeyedStateStore globalState() {
+			return globalState;
+		}
+	}
+
 	private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 24c8d32..85451a5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -134,14 +134,14 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 											" window: " + mergeResult);
 								}
 
-								context.key = key;
-								context.window = mergeResult;
+								triggerContext.key = key;
+								triggerContext.window = mergeResult;
 
-								context.onMerge(mergedWindows);
+								triggerContext.onMerge(mergedWindows);
 
 								for (W m : mergedWindows) {
-									context.window = m;
-									context.clear();
+									triggerContext.window = m;
+									triggerContext.clear();
 									deleteCleanupTimer(m);
 								}
 
@@ -165,12 +165,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 				evictingWindowState.setCurrentNamespace(stateWindow);
 				evictingWindowState.add(element);
 
-				context.key = key;
-				context.window = actualWindow;
+				triggerContext.key = key;
+				triggerContext.window = actualWindow;
 				evictorContext.key = key;
 				evictorContext.window = actualWindow;
 
-				TriggerResult triggerResult = context.onElement(element);
+				TriggerResult triggerResult = triggerContext.onElement(element);
 
 				if (triggerResult.isFire()) {
 					Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
@@ -201,12 +201,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 				evictingWindowState.setCurrentNamespace(window);
 				evictingWindowState.add(element);
 
-				context.key = key;
-				context.window = window;
+				triggerContext.key = key;
+				triggerContext.window = window;
 				evictorContext.key = key;
 				evictorContext.window = window;
 
-				TriggerResult triggerResult = context.onElement(element);
+				TriggerResult triggerResult = triggerContext.onElement(element);
 
 				if (triggerResult.isFire()) {
 					Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
@@ -236,8 +236,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 	@Override
 	public void onEventTime(InternalTimer<K, W> timer) throws Exception {
 
-		context.key = timer.getKey();
-		context.window = timer.getNamespace();
+		triggerContext.key = timer.getKey();
+		triggerContext.window = timer.getNamespace();
 		evictorContext.key = timer.getKey();
 		evictorContext.window = timer.getNamespace();
 
@@ -245,7 +245,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			mergingWindows = getMergingWindowSet();
-			W stateWindow = mergingWindows.getStateWindow(context.window);
+			W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
 			if (stateWindow == null) {
 				// Timer firing for non-existent window, this can only happen if a
 				// trigger did not clean up timers. We have already cleared the merging
@@ -255,23 +255,23 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 				evictingWindowState.setCurrentNamespace(stateWindow);
 			}
 		} else {
-			evictingWindowState.setCurrentNamespace(context.window);
+			evictingWindowState.setCurrentNamespace(triggerContext.window);
 		}
 
 		Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
 
 		if (contents != null) {
-			TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+			TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
 			if (triggerResult.isFire()) {
-				emitWindowContents(context.window, contents, evictingWindowState);
+				emitWindowContents(triggerContext.window, contents, evictingWindowState);
 			}
 			if (triggerResult.isPurge()) {
 				evictingWindowState.clear();
 			}
 		}
 
-		if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
-			clearAllState(context.window, evictingWindowState, mergingWindows);
+		if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+			clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
 		}
 
 		if (mergingWindows != null) {
@@ -282,8 +282,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 
 	@Override
 	public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
-		context.key = timer.getKey();
-		context.window = timer.getNamespace();
+		triggerContext.key = timer.getKey();
+		triggerContext.window = timer.getNamespace();
 		evictorContext.key = timer.getKey();
 		evictorContext.window = timer.getNamespace();
 
@@ -291,7 +291,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			mergingWindows = getMergingWindowSet();
-			W stateWindow = mergingWindows.getStateWindow(context.window);
+			W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
 			if (stateWindow == null) {
 				// Timer firing for non-existent window, this can only happen if a
 				// trigger did not clean up timers. We have already cleared the merging
@@ -301,23 +301,23 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 				evictingWindowState.setCurrentNamespace(stateWindow);
 			}
 		} else {
-			evictingWindowState.setCurrentNamespace(context.window);
+			evictingWindowState.setCurrentNamespace(triggerContext.window);
 		}
 
 		Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
 
 		if (contents != null) {
-			TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+			TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());
 			if (triggerResult.isFire()) {
-				emitWindowContents(context.window, contents, evictingWindowState);
+				emitWindowContents(triggerContext.window, contents, evictingWindowState);
 			}
 			if (triggerResult.isPurge()) {
 				evictingWindowState.clear();
 			}
 		}
 
-		if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
-			clearAllState(context.window, evictingWindowState, mergingWindows);
+		if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+			clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
 		}
 
 		if (mergingWindows != null) {
@@ -348,7 +348,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 				}
 			});
 
-		userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
+		processContext.window = triggerContext.window;
+		userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector);
 		evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
 
 
@@ -364,9 +365,10 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 			W window,
 			ListState<StreamRecord<IN>> windowState,
 			MergingWindowSet<W> mergingWindows) throws Exception {
-
 		windowState.clear();
-		context.clear();
+		triggerContext.clear();
+		processContext.window = window;
+		processContext.clear();
 		if (mergingWindows != null) {
 			mergingWindows.retireWindow(window);
 			mergingWindows.persist();

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 3745659..3d40716 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -23,8 +23,16 @@ import org.apache.commons.math3.util.ArithmeticUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.AppendingState;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
@@ -159,7 +167,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	/** This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp. */
 	protected transient TimestampedCollector<OUT> timestampedCollector;
 
-	protected transient Context context = new Context(null, null);
+	protected transient Context triggerContext = new Context(null, null);
+
+	protected transient WindowContext processContext = new WindowContext(null);
 
 	protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
 
@@ -264,7 +274,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		internalTimerService =
 				getInternalTimerService("window-timers", windowSerializer, this);
 
-		context = new Context(null, null);
+		triggerContext = new Context(null, null);
+		processContext = new WindowContext( null);
 
 		windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
 			@Override
@@ -317,7 +328,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	public void close() throws Exception {
 		super.close();
 		timestampedCollector = null;
-		context = null;
+		triggerContext = null;
+		processContext = null;
 		windowAssignerContext = null;
 	}
 
@@ -325,7 +337,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	public void dispose() throws Exception {
 		super.dispose();
 		timestampedCollector = null;
-		context = null;
+		triggerContext = null;
+		processContext = null;
 		windowAssignerContext = null;
 	}
 
@@ -365,14 +378,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 									" window: " + mergeResult);
 						}
 
-						context.key = key;
-						context.window = mergeResult;
+						triggerContext.key = key;
+						triggerContext.window = mergeResult;
 
-						context.onMerge(mergedWindows);
+						triggerContext.onMerge(mergedWindows);
 
 						for (W m: mergedWindows) {
-							context.window = m;
-							context.clear();
+							triggerContext.window = m;
+							triggerContext.clear();
 							deleteCleanupTimer(m);
 						}
 
@@ -396,10 +409,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				windowState.setCurrentNamespace(stateWindow);
 				windowState.add(element.getValue());
 
-				context.key = key;
-				context.window = actualWindow;
+				triggerContext.key = key;
+				triggerContext.window = actualWindow;
 
-				TriggerResult triggerResult = context.onElement(element);
+				TriggerResult triggerResult = triggerContext.onElement(element);
 
 				if (triggerResult.isFire()) {
 					ACC contents = windowState.get();
@@ -429,10 +442,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				windowState.setCurrentNamespace(window);
 				windowState.add(element.getValue());
 
-				context.key = key;
-				context.window = window;
+				triggerContext.key = key;
+				triggerContext.window = window;
 
-				TriggerResult triggerResult = context.onElement(element);
+				TriggerResult triggerResult = triggerContext.onElement(element);
 
 				if (triggerResult.isFire()) {
 					ACC contents = windowState.get();
@@ -460,14 +473,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 	@Override
 	public void onEventTime(InternalTimer<K, W> timer) throws Exception {
-		context.key = timer.getKey();
-		context.window = timer.getNamespace();
+		triggerContext.key = timer.getKey();
+		triggerContext.window = timer.getNamespace();
 
 		MergingWindowSet<W> mergingWindows;
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			mergingWindows = getMergingWindowSet();
-			W stateWindow = mergingWindows.getStateWindow(context.window);
+			W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
 			if (stateWindow == null) {
 				// Timer firing for non-existent window, this can only happen if a
 				// trigger did not clean up timers. We have already cleared the merging
@@ -477,7 +490,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				windowState.setCurrentNamespace(stateWindow);
 			}
 		} else {
-			windowState.setCurrentNamespace(context.window);
+			windowState.setCurrentNamespace(triggerContext.window);
 			mergingWindows = null;
 		}
 
@@ -487,17 +500,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		}
 
 		if (contents != null) {
-			TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+			TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
 			if (triggerResult.isFire()) {
-				emitWindowContents(context.window, contents);
+				emitWindowContents(triggerContext.window, contents);
 			}
 			if (triggerResult.isPurge()) {
 				windowState.clear();
 			}
 		}
 
-		if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
-			clearAllState(context.window, windowState, mergingWindows);
+		if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+			clearAllState(triggerContext.window, windowState, mergingWindows);
 		}
 
 		if (mergingWindows != null) {
@@ -508,14 +521,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 	@Override
 	public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
-		context.key = timer.getKey();
-		context.window = timer.getNamespace();
+		triggerContext.key = timer.getKey();
+		triggerContext.window = timer.getNamespace();
 
 		MergingWindowSet<W> mergingWindows;
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			mergingWindows = getMergingWindowSet();
-			W stateWindow = mergingWindows.getStateWindow(context.window);
+			W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
 			if (stateWindow == null) {
 				// Timer firing for non-existent window, this can only happen if a
 				// trigger did not clean up timers. We have already cleared the merging
@@ -525,7 +538,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				windowState.setCurrentNamespace(stateWindow);
 			}
 		} else {
-			windowState.setCurrentNamespace(context.window);
+			windowState.setCurrentNamespace(triggerContext.window);
 			mergingWindows = null;
 		}
 
@@ -535,17 +548,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		}
 
 		if (contents != null) {
-			TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+			TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());
 			if (triggerResult.isFire()) {
-				emitWindowContents(context.window, contents);
+				emitWindowContents(triggerContext.window, contents);
 			}
 			if (triggerResult.isPurge()) {
 				windowState.clear();
 			}
 		}
 
-		if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
-			clearAllState(context.window, windowState, mergingWindows);
+		if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+			clearAllState(triggerContext.window, windowState, mergingWindows);
 		}
 
 		if (mergingWindows != null) {
@@ -559,14 +572,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 * {@link Trigger#clear(Window, Trigger.TriggerContext)}.
 	 *
 	 * <p>The caller must ensure that the
-	 * correct key is set in the state backend and the context object.
+	 * correct key is set in the state backend and the triggerContext object.
 	 */
 	private void clearAllState(
 			W window,
 			AppendingState<IN, ACC> windowState,
 			MergingWindowSet<W> mergingWindows) throws Exception {
 		windowState.clear();
-		context.clear();
+		triggerContext.clear();
+		processContext.window = window;
+		processContext.clear();
 		if (mergingWindows != null) {
 			mergingWindows.retireWindow(window);
 			mergingWindows.persist();
@@ -579,7 +594,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	@SuppressWarnings("unchecked")
 	private void emitWindowContents(W window, ACC contents) throws Exception {
 		timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
-		userFunction.apply(context.key, context.window, contents, timestampedCollector);
+		processContext.window = window;
+		userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
 	}
 
 	/**
@@ -636,9 +652,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		}
 
 		if (windowAssigner.isEventTime()) {
-			context.registerEventTimeTimer(cleanupTime);
+			triggerContext.registerEventTimeTimer(cleanupTime);
 		} else {
-			context.registerProcessingTimeTimer(cleanupTime);
+			triggerContext.registerProcessingTimeTimer(cleanupTime);
 		}
 	}
 
@@ -654,9 +670,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			return;
 		}
 		if (windowAssigner.isEventTime()) {
-			context.deleteEventTimeTimer(cleanupTime);
+			triggerContext.deleteEventTimeTimer(cleanupTime);
 		} else {
-			context.deleteProcessingTimeTimer(cleanupTime);
+			triggerContext.deleteProcessingTimeTimer(cleanupTime);
 		}
 	}
 
@@ -686,6 +702,134 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	}
 
 	/**
+	 * Base class for per-window {@link KeyedStateStore KeyedStateStores}. Used to allow per-window
+	 * state access for {@link org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction}.
+	 */
+	public abstract class AbstractPerWindowStateStore implements KeyedStateStore {
+
+		// we have this in the base class even though it's not used in MergingKeyStore so that
+		// we can always set it and ignore what actual implementation we have
+		protected W window;
+	}
+
+	/**
+	 * Special {@link AbstractPerWindowStateStore} that doesn't allow access to per-window state.
+	 */
+	public class MergingWindowStateStore extends AbstractPerWindowStateStore {
+		@Override
+		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+		}
+
+		@Override
+		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+		}
+
+		@Override
+		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+		}
+
+		@Override
+		public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+		}
+
+		@Override
+		public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+		}
+	}
+
+	/**
+	 * Regular per-window state store for use with
+	 * {@link org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction}.
+	 */
+	public class PerWindowStateStore extends AbstractPerWindowStateStore {
+		@Override
+		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+			try {
+				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+			} catch (Exception e) {
+				throw new RuntimeException("Could not retrieve state", e);
+			}
+		}
+
+		@Override
+		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+			try {
+				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+			} catch (Exception e) {
+				throw new RuntimeException("Could not retrieve state", e);
+			}
+		}
+
+		@Override
+		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+			try {
+				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+			} catch (Exception e) {
+				throw new RuntimeException("Could not retrieve state", e);
+			}
+		}
+
+		@Override
+		public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+			try {
+				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+			} catch (Exception e) {
+				throw new RuntimeException("Could not retrieve state", e);
+			}
+		}
+
+		@Override
+		public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+			try {
+				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+			} catch (Exception e) {
+				throw new RuntimeException("Could not retrieve state", e);
+			}
+		}
+	}
+
+	/**
+	 * A utility class for handling {@code ProcessWindowFunction} invocations. This can be reused
+	 * by setting the {@code key} and {@code window} fields. No internal state must be kept in the
+	 * {@code WindowContext}.
+	 */
+	public class WindowContext implements InternalWindowFunction.InternalWindowContext {
+		protected W window;
+
+		protected AbstractPerWindowStateStore windowState;
+
+		public WindowContext(W window) {
+			this.window = window;
+			this.windowState = windowAssigner instanceof MergingWindowAssigner ?  new MergingWindowStateStore() : new PerWindowStateStore();
+		}
+
+		@Override
+		public String toString() {
+			return "WindowContext{Window = " + window.toString() + "}";
+		}
+
+		public void clear() throws Exception {
+			userFunction.clear(window, this);
+		}
+
+		@Override
+		public KeyedStateStore windowState() {
+			this.windowState.window = this.window;
+			return this.windowState;
+		}
+
+		@Override
+		public KeyedStateStore globalState() {
+			return WindowOperator.this.getKeyedStateStore();
+		}
+	}
+
+	/**
 	 * {@code Context} is a utility for handling {@code Trigger} invocations. It can be reused
 	 * by setting the {@code key} and {@code window} fields. No internal state must be kept in
 	 * the {@code Context}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
index 9533c95..83e896d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -45,6 +46,8 @@ public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W ext
 
 	private final AggregateFunction<T, ACC, V> aggFunction;
 
+	private transient InternalProcessAllWindowContext<V, R, W> ctx;
+
 	public InternalAggregateProcessAllWindowFunction(
 			AggregateFunction<T, ACC, V> aggFunction,
 			ProcessAllWindowFunction<V, R, W> windowFunction) {
@@ -53,22 +56,31 @@ public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W ext
 	}
 
 	@Override
-	public void apply(Byte key, final W window, Iterable<T> input, Collector<R> out) throws Exception {
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
 		ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
-		ProcessAllWindowFunction<V, R, W>.Context context = wrappedFunction.new Context() {
-			@Override
-			public W window() {
-				return window;
-			}
-		};
+		this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction);
+	}
 
+	@Override
+	public void process(Byte key, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception {
 		final ACC acc = aggFunction.createAccumulator();
 
 		for (T val : input) {
 			aggFunction.add(val, acc);
 		}
 
-		wrappedFunction.process(context, Collections.singletonList(aggFunction.getResult(acc)), out);
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+		ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
+		wrappedFunction.process(ctx, Collections.singletonList(aggFunction.getResult(acc)), out);
+	}
+
+	@Override
+	public void clear(final W window, final InternalWindowContext context) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+		wrappedFunction.clear(ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
index 433da9b..e14c9bd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
@@ -46,30 +46,36 @@ public final class InternalAggregateProcessWindowFunction<T, ACC, V, R, K, W ext
 
 	private final AggregateFunction<T, ACC, V> aggFunction;
 
+	private final InternalProcessWindowContext<V, R, K, W> ctx;
+
 	public InternalAggregateProcessWindowFunction(
 			AggregateFunction<T, ACC, V> aggFunction,
 			ProcessWindowFunction<V, R, K, W> windowFunction) {
 		super(windowFunction);
 		this.aggFunction = aggFunction;
+		this.ctx = new InternalProcessWindowContext<>(windowFunction);
 	}
-	
-	@Override
-	public void apply(K key, final W window, Iterable<T> input, Collector<R> out) throws Exception {
-		ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction;
-		ProcessWindowFunction<V, R, K, W>.Context context = wrappedFunction.new Context() {
-			@Override
-			public W window() {
-				return window;
-			}
-		};
 
+	@Override
+	public void process(K key, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception {
 		final ACC acc = aggFunction.createAccumulator();
 
 		for (T val : input) {
 			aggFunction.add(val, acc);
 		}
 
-		wrappedFunction.process(key, context, Collections.singletonList(aggFunction.getResult(acc)), out);
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+		ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction;
+		wrappedFunction.process(key, ctx, Collections.singletonList(aggFunction.getResult(acc)), out);
+	}
+
+	@Override
+	public void clear(final W window, final InternalWindowContext context) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+		ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction;
+		wrappedFunction.clear(ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
index 672bdb6..f2507ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
@@ -39,11 +39,16 @@ public final class InternalIterableAllWindowFunction<IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void apply(Byte key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+	public void process(Byte key, W window, InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
 		wrappedFunction.apply(window, input, out);
 	}
 
 	@Override
+	public void clear(W window, InternalWindowContext context) throws Exception {
+
+	}
+
+	@Override
 	public RuntimeContext getRuntimeContext() {
 		throw new RuntimeException("This should never be called.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
index e33cc2a..47b7d55 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing.functions;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -34,21 +35,33 @@ public final class InternalIterableProcessAllWindowFunction<IN, OUT, W extends W
 
 	private static final long serialVersionUID = 1L;
 
+	private transient InternalProcessAllWindowContext<IN, OUT, W> ctx;
+
 	public InternalIterableProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> wrappedFunction) {
 		super(wrappedFunction);
 	}
 
 	@Override
-	public void apply(Byte key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+		this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction);
+	}
+
+	@Override
+	public void process(Byte key, final W window, final InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+		ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+		wrappedFunction.process(ctx, input, out);
+	}
+
+	@Override
+	public void clear(final W window, final InternalWindowContext context) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
 		ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
-		ProcessAllWindowFunction<IN, OUT, W>.Context context = wrappedFunction.new Context() {
-			@Override
-			public W window() {
-				return window;
-			}
-		};
-
-		wrappedFunction.process(context, input, out);
+		wrappedFunction.clear(ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
index de516a5..7eb015e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
@@ -34,21 +34,27 @@ public final class InternalIterableProcessWindowFunction<IN, OUT, KEY, W extends
 
 	private static final long serialVersionUID = 1L;
 
+	private final InternalProcessWindowContext<IN, OUT, KEY, W> ctx;
+
 	public InternalIterableProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) {
 		super(wrappedFunction);
+		this.ctx = new InternalProcessWindowContext<>(wrappedFunction);
+	}
+
+	@Override
+	public void process(KEY key, final W window, final InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+		ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
+		wrappedFunction.process(key, ctx, input, out);
 	}
 
 	@Override
-	public void apply(KEY key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+	public void clear(final W window, final InternalWindowContext context) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
 		ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
-		ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() {
-			@Override
-			public W window() {
-				return window;
-			}
-		};
-		
-		wrappedFunction.process(key, context, input, out);
+		wrappedFunction.clear(ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
index 895b31f..e2f1517 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
@@ -39,11 +39,16 @@ public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window
 	}
 
 	@Override
-	public void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+	public void process(KEY key, W window, InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
 		wrappedFunction.apply(key, window, input, out);
 	}
 
 	@Override
+	public void clear(W window, InternalWindowContext context) throws Exception {
+
+	}
+
+	@Override
 	public RuntimeContext getRuntimeContext() {
 		throw new RuntimeException("This should never be called.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
new file mode 100644
index 0000000..c70e161
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessAllWindowContext<IN, OUT, W extends Window>
+	extends ProcessAllWindowFunction<IN, OUT, W>.Context {
+
+	W window;
+	InternalWindowFunction.InternalWindowContext internalContext;
+
+	InternalProcessAllWindowContext(ProcessAllWindowFunction<IN, OUT, W> function) {
+		function.super();
+	}
+
+	@Override
+	public W window() {
+		return window;
+	}
+
+	@Override
+	public KeyedStateStore windowState() {
+		return internalContext.windowState();
+	}
+
+	@Override
+	public KeyedStateStore globalState() {
+		return internalContext.globalState();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
new file mode 100644
index 0000000..0f1c0ee
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessWindowContext<IN, OUT, KEY, W extends Window>
+	extends ProcessWindowFunction<IN, OUT, KEY, W>.Context {
+
+	W window;
+	InternalWindowFunction.InternalWindowContext internalContext;
+
+	InternalProcessWindowContext(ProcessWindowFunction<IN, OUT, KEY, W> function) {
+		function.super();
+	}
+
+	@Override
+	public W window() {
+		return window;
+	}
+
+	@Override
+	public KeyedStateStore windowState() {
+		return internalContext.windowState();
+	}
+
+	@Override
+	public KeyedStateStore globalState() {
+		return internalContext.globalState();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
index a34d3ec..e90bcf4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
@@ -41,11 +41,16 @@ public final class InternalSingleValueAllWindowFunction<IN, OUT, W extends Windo
 	}
 
 	@Override
-	public void apply(Byte key, W window, IN input, Collector<OUT> out) throws Exception {
+	public void process(Byte key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
 		wrappedFunction.apply(window, Collections.singletonList(input), out);
 	}
 
 	@Override
+	public void clear(W window, InternalWindowContext context) throws Exception {
+
+	}
+
+	@Override
 	public RuntimeContext getRuntimeContext() {
 		throw new RuntimeException("This should never be called.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
index 0284ef7..f7c6a08 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing.functions;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -36,21 +37,33 @@ public final class InternalSingleValueProcessAllWindowFunction<IN, OUT, W extend
 
 	private static final long serialVersionUID = 1L;
 
+	private transient InternalProcessAllWindowContext<IN, OUT, W> ctx;
+
 	public InternalSingleValueProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> wrappedFunction) {
 		super(wrappedFunction);
 	}
 
 	@Override
-	public void apply(Byte key, final W window, IN input, Collector<OUT> out) throws Exception {
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+		this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction);
+	}
+
+	@Override
+	public void process(Byte key, final W window, final InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
 		ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
-		ProcessAllWindowFunction<IN, OUT, W>.Context context = wrappedFunction.new Context() {
-			@Override
-			public W window() {
-				return window;
-			}
-		};
+		wrappedFunction.process(ctx, Collections.singletonList(input), out);
+	}
 
-		wrappedFunction.process(context, Collections.singletonList(input), out);
+	@Override
+	public void clear(final W window, final InternalWindowContext context) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+		ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+		wrappedFunction.clear(ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
index 7a4e8c6..21d1639 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
@@ -36,21 +36,29 @@ public final class InternalSingleValueProcessWindowFunction<IN, OUT, KEY, W exte
 
 	private static final long serialVersionUID = 1L;
 
+	private final InternalProcessWindowContext<IN, OUT, KEY, W> ctx;
+
 	public InternalSingleValueProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) {
 		super(wrappedFunction);
+		ctx = new InternalProcessWindowContext<>(wrappedFunction);
 	}
 
 	@Override
-	public void apply(KEY key, final W window, IN input, Collector<OUT> out) throws Exception {
+	public void process(KEY key, final W window, final InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+
 		ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
-		ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() {
-			@Override
-			public W window() {
-				return window;
-			}
-		};
+		wrappedFunction.process(key, ctx, Collections.singletonList(input), out);
+	}
 
-		wrappedFunction.process(key, context, Collections.singletonList(input), out);
+	@Override
+	public void clear(final W window, final InternalWindowContext context) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+
+		ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
+		wrappedFunction.clear(ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
index 9a0a447..d5cc4a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
@@ -41,11 +41,16 @@ public final class InternalSingleValueWindowFunction<IN, OUT, KEY, W extends Win
 	}
 
 	@Override
-	public void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception {
+	public void process(KEY key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
 		wrappedFunction.apply(key, window, Collections.singletonList(input), out);
 	}
 
 	@Override
+	public void clear(W window, InternalWindowContext context) throws Exception {
+
+	}
+
+	@Override
 	public RuntimeContext getRuntimeContext() {
 		throw new RuntimeException("This should never be called.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
index 2eb4052..9834480 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -29,15 +30,28 @@ import org.apache.flink.util.Collector;
  * @param <KEY> The type of the key.
  */
 public interface InternalWindowFunction<IN, OUT, KEY, W extends Window> extends Function {
-
 	/**
 	 * Evaluates the window and outputs none or several elements.
 	 *
-	 * @param key    The key for which this window is evaluated.
-	 * @param window The window that is being evaluated.
-	 * @param input  The elements in the window being evaluated.
-	 * @param out    A collector for emitting elements.
+	 * @param context The context in which the window is being evaluated.
+	 * @param input The elements in the window being evaluated.
+	 * @param out A collector for emitting elements.
+	 *
 	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
 	 */
-	void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception;
+	void process(KEY key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception;
+
+	/**
+	 * Deletes any state in the {@code Context} when the Window is purged.
+	 *
+	 * @param context The context to which the window is being evaluated
+	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+	 */
+	void clear(W window, InternalWindowContext context) throws Exception;
+
+	interface InternalWindowContext extends java.io.Serializable {
+		KeyedStateStore windowState();
+
+		KeyedStateStore globalState();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
index 4b479f3..c4bed37 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -21,20 +21,28 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.ByteSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -45,8 +53,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProces
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import org.apache.flink.util.Collector;
-import org.junit.Test;
 import org.junit.Assert;
+import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -139,12 +147,26 @@ public class FoldApplyProcessWindowFunctionTest {
 
 		expected.add(initValue);
 
-		foldWindowFunction.process(0, foldWindowFunction.new Context() {
+		FoldApplyProcessWindowFunction<Integer, TimeWindow, Integer, Integer, Integer>.Context ctx = foldWindowFunction.new Context() {
 			@Override
 			public TimeWindow window() {
 				return new TimeWindow(0, 1);
 			}
-		}, input, new ListCollector<>(result));
+
+			@Override
+			public KeyedStateStore windowState() {
+				return new DummyKeyedStateStore();
+			}
+
+			@Override
+			public KeyedStateStore globalState() {
+				return new DummyKeyedStateStore();
+			}
+		};
+
+		foldWindowFunction.open(new Configuration());
+
+		foldWindowFunction.process(0, ctx, input, new ListCollector<>(result));
 
 		Assert.assertEquals(expected, result);
 	}
@@ -234,16 +256,58 @@ public class FoldApplyProcessWindowFunctionTest {
 
 		expected.add(initValue);
 
-		foldWindowFunction.process(foldWindowFunction.new Context() {
+		FoldApplyProcessAllWindowFunction<TimeWindow, Integer, Integer, Integer>.Context ctx = foldWindowFunction.new Context() {
 			@Override
 			public TimeWindow window() {
 				return new TimeWindow(0, 1);
 			}
-		}, input, new ListCollector<>(result));
+
+			@Override
+			public KeyedStateStore windowState() {
+				return new DummyKeyedStateStore();
+			}
+
+			@Override
+			public KeyedStateStore globalState() {
+				return new DummyKeyedStateStore();
+			}
+		};
+
+		foldWindowFunction.open(new Configuration());
+
+		foldWindowFunction.process(ctx, input, new ListCollector<>(result));
 
 		Assert.assertEquals(expected, result);
 	}
 
+	public static class DummyKeyedStateStore implements KeyedStateStore {
+
+		@Override
+		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+			return null;
+		}
+
+		@Override
+		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+			return null;
+		}
+
+		@Override
+		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+			return null;
+		}
+
+		@Override
+		public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+			return null;
+		}
+
+		@Override
+		public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+			return null;
+		}
+	}
+
 	public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
 
 		public DummyStreamExecutionEnvironment() {


Mime
View raw message