flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-4207] WindowOperator becomes very slow with allowed lateness
Date Tue, 26 Jul 2016 20:27:00 GMT
Repository: flink
Updated Branches:
  refs/heads/master 884d3e2a4 -> 12bf7c1a0


[FLINK-4207] WindowOperator becomes very slow with allowed lateness


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12bf7c1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12bf7c1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12bf7c1a

Branch: refs/heads/master
Commit: 12bf7c1a0b81d199085fe874c64763c51a93b3bf
Parents: 884d3e2
Author: kl0u <kkloudas@gmail.com>
Authored: Mon Jul 18 11:37:06 2016 +0200
Committer: kl0u <kkloudas@gmail.com>
Committed: Tue Jul 26 21:12:05 2016 +0200

----------------------------------------------------------------------
 .../streaming/state/RocksDBFoldingState.java    |   2 +-
 .../streaming/state/RocksDBListState.java       |   3 +-
 .../flink/api/common/state/AppendingState.java  |  10 +-
 .../flink/runtime/state/GenericListState.java   |   7 +-
 .../state/filesystem/FsFoldingState.java        |   7 +-
 .../runtime/state/filesystem/FsListState.java   |  13 +-
 .../runtime/state/memory/MemFoldingState.java   |   7 +-
 .../runtime/state/memory/MemListState.java      |  13 +-
 .../runtime/state/StateBackendTestBase.java     |   8 +-
 .../api/windowing/triggers/PurgingTrigger.java  |   2 +-
 .../windowing/EvictingWindowOperator.java       |  75 +++-
 .../operators/windowing/MergingWindowSet.java   |  14 +-
 .../operators/windowing/WindowOperator.java     |  74 +++-
 .../windowing/MergingWindowSetTest.java         |   7 +-
 .../operators/windowing/WindowOperatorTest.java | 438 +++++++++++++++++++
 .../sessionwindows/SessionWindowITCase.java     |   2 -
 16 files changed, 580 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 8ffe3a6..218fa2a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -93,7 +93,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
 			byte[] key = baos.toByteArray();
 			byte[] valueBytes = backend.db.get(columnFamily, key);
 			if (valueBytes == null) {
-				return stateDesc.getDefaultValue();
+				return null;
 			}
 			return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
 		} catch (IOException|RocksDBException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index b13c5ae..ce3a48e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -32,7 +32,6 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import static java.util.Objects.requireNonNull;
@@ -94,7 +93,7 @@ public class RocksDBListState<K, N, V>
 			byte[] valueBytes = backend.db.get(columnFamily, key);
 
 			if (valueBytes == null) {
-				return Collections.emptyList();
+				return null;
 			}
 
 			ByteArrayInputStream bais = new ByteArrayInputStream(valueBytes);

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
index 04dc784..8ea8364 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
@@ -46,8 +46,14 @@ public interface AppendingState<IN, OUT> extends State {
 	 * operator instance. If state partitioning is applied, the value returned
 	 * depends on the current operator input, as the operator maintains an
 	 * independent state for each partition.
-	 * 
-	 * @return The operator state value corresponding to the current input.
+	 *
+	 * <p>
+	 *     <b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method
+	 *     should return {@code null}.
+	 * </p>
+	 *
+	 * @return The operator state value corresponding to the current input or {@code null}
+	 * if the state is empty.
 	 * 
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
index 3414855..2e40898 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 
 /**
  * Generic implementation of {@link ListState} based on a wrapped {@link ValueState}.
@@ -82,11 +81,7 @@ public class GenericListState<K, N, T, Backend extends AbstractStateBackend, W e
 
 	@Override
 	public Iterable<T> get() throws Exception {
-		ArrayList<T> result = wrappedState.value();
-		if (result == null) {
-			return Collections.emptyList();
-		}
-		return result;
+		return wrappedState.value();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
index bba6df5..90baf36 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
@@ -88,11 +88,8 @@ public class FsFoldingState<K, N, T, ACC>
 		if (currentNSState == null) {
 			currentNSState = state.get(currentNamespace);
 		}
-		if (currentNSState != null) {
-			ACC value = currentNSState.get(currentKey);
-			return value != null ? value : stateDesc.getDefaultValue();
-		}
-		return stateDesc.getDefaultValue();
+		return currentNSState != null ?
+			currentNSState.get(currentKey) : null;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java
index 1d5b5f8..46c9830 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java
@@ -27,9 +27,7 @@ import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.KvStateSnapshot;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -85,15 +83,8 @@ public class FsListState<K, N, V>
 		if (currentNSState == null) {
 			currentNSState = state.get(currentNamespace);
 		}
-		if (currentNSState != null) {
-			List<V> result = currentNSState.get(currentKey);
-			if (result == null) {
-				return Collections.emptyList();
-			} else {
-				return result;
-			}
-		}
-		return Collections.emptyList();
+		return currentNSState != null ?
+			currentNSState.get(currentKey) : null;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
index 07b677b..9953a64 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
@@ -64,11 +64,8 @@ public class MemFoldingState<K, N, T, ACC>
 		if (currentNSState == null) {
 			currentNSState = state.get(currentNamespace);
 		}
-		if (currentNSState != null) {
-			ACC value = currentNSState.get(currentKey);
-			return value != null ? value : stateDesc.getDefaultValue();
-		}
-		return stateDesc.getDefaultValue();
+		return currentNSState != null ?
+			currentNSState.get(currentKey) : null;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java
index d5e4dfd..97461d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java
@@ -26,9 +26,7 @@ import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.KvStateSnapshot;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -56,15 +54,8 @@ public class MemListState<K, N, V>
 		if (currentNSState == null) {
 			currentNSState = state.get(currentNamespace);
 		}
-		if (currentNSState != null) {
-			List<V> result = currentNSState.get(currentKey);
-			if (result == null) {
-				return Collections.emptyList();
-			} else {
-				return result;
-			}
-		}
-		return Collections.emptyList();
+		return currentNSState != null ?
+			currentNSState.get(currentKey) : null;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 12cf112..80f1de3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -243,10 +243,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			Joiner joiner = Joiner.on(",");
 			// some modifications to the state
 			backend.setCurrentKey(1);
-			assertEquals("", joiner.join(state.get()));
+			assertEquals(null, state.get());
 			state.add("1");
 			backend.setCurrentKey(2);
-			assertEquals("", joiner.join(state.get()));
+			assertEquals(null, state.get());
 			state.add("2");
 			backend.setCurrentKey(1);
 			assertEquals("1", joiner.join(state.get()));
@@ -438,10 +438,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 			// some modifications to the state
 			backend.setCurrentKey(1);
-			assertEquals("Fold-Initial:", state.get());
+			assertEquals(null, state.get());
 			state.add(1);
 			backend.setCurrentKey(2);
-			assertEquals("Fold-Initial:", state.get());
+			assertEquals(null, state.get());
 			state.add(2);
 			backend.setCurrentKey(1);
 			assertEquals("Fold-Initial:,1", state.get());

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index 85d0b52..8b30130 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -72,7 +72,7 @@ public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
 	@Override
 	public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception {
 		TriggerResult triggerResult = nestedTrigger.onMerge(window, ctx);
-		return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
+		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 3f2c6a3..15f716c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -132,12 +132,14 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 
 				// check if the window is already inactive
 				if (isLate(actualWindow)) {
-					LOG.info("Dropped element " + element + " for window " + actualWindow + " due to lateness.");
 					mergingWindows.retireWindow(actualWindow);
 					continue;
 				}
 
 				W stateWindow = mergingWindows.getStateWindow(actualWindow);
+				if (stateWindow == null) {
+					throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
+				}
 				ListState<StreamRecord<IN>> windowState = getPartitionedState(
 					stateWindow, windowSerializer, windowStateDescriptor);
 				windowState.add(element);
@@ -149,7 +151,15 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 				// on the (possibly merged) window
 				TriggerResult triggerResult = context.onElement(element);
 				TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0);
-				fireOrContinue(combinedTriggerResult, actualWindow, windowState);
+
+				if (combinedTriggerResult.isFire()) {
+					Iterable<StreamRecord<IN>> contents = windowState.get();
+					if (contents == null) {
+						// if we have no state, there is nothing to do
+						continue;
+					}
+					fire(actualWindow, contents);
+				}
 
 				if (combinedTriggerResult.isPurge()) {
 					cleanup(actualWindow, windowState, mergingWindows);
@@ -163,7 +173,6 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 
 				// check if the window is already inactive
 				if (isLate(window)) {
-					LOG.info("Dropped element " + element + " for window " + window + " due to lateness.");
 					continue;
 				}
 
@@ -175,7 +184,15 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 				context.window = window;
 
 				TriggerResult triggerResult = context.onElement(element);
-				fireOrContinue(triggerResult, window, windowState);
+
+				if (triggerResult.isFire()) {
+					Iterable<StreamRecord<IN>> contents = windowState.get();
+					if (contents == null) {
+						// if we have no state, there is nothing to do
+						continue;
+					}
+					fire(window, contents);
+				}
 
 				if (triggerResult.isPurge()) {
 					cleanup(window, windowState, null);
@@ -207,16 +224,30 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 				if (windowAssigner instanceof MergingWindowAssigner) {
 					mergingWindows = getMergingWindowSet();
 					W stateWindow = mergingWindows.getStateWindow(context.window);
+					if (stateWindow == null) {
+						// then the window is already purged and this is a cleanup
+						// timer set due to allowed lateness that has nothing to clean,
+						// so it is safe to just ignore
+						continue;
+					}
 					windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
 				} else {
 					windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
 				}
 
+				Iterable<StreamRecord<IN>> contents = windowState.get();
+				if (contents == null) {
+					// if we have no state, there is nothing to do
+					continue;
+				}
+
 				TriggerResult triggerResult = context.onEventTime(timer.timestamp);
-				fireOrContinue(triggerResult, context.window, windowState);
+				if (triggerResult.isFire()) {
+					fire(context.window, contents);
+				}
 
-				if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
-					cleanup(timer.window, windowState, mergingWindows);
+				if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
+					cleanup(context.window, windowState, mergingWindows);
 				}
 
 			} else {
@@ -255,16 +286,30 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 				if (windowAssigner instanceof MergingWindowAssigner) {
 					mergingWindows = getMergingWindowSet();
 					W stateWindow = mergingWindows.getStateWindow(context.window);
+					if (stateWindow == null) {
+						// then the window is already purged and this is a cleanup
+						// timer set due to allowed lateness that has nothing to clean,
+						// so it is safe to just ignore
+						continue;
+					}
 					windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
 				} else {
 					windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
 				}
 
+				Iterable<StreamRecord<IN>> contents = windowState.get();
+				if (contents == null) {
+					// if we have no state, there is nothing to do
+					continue;
+				}
+
 				TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
-				fireOrContinue(triggerResult, context.window, windowState);
+				if (triggerResult.isFire()) {
+					fire(context.window, contents);
+				}
 
-				if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
-					cleanup(timer.window, windowState, mergingWindows);
+				if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
+					cleanup(context.window, windowState, mergingWindows);
 				}
 
 			} else {
@@ -273,15 +318,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 		} while (fire);
 	}
 
-	private void fireOrContinue(TriggerResult triggerResult,
-								W window,
-								ListState<StreamRecord<IN>> windowState) throws Exception {
-		if (!triggerResult.isFire()) {
-			return;
-		}
-
+	private void fire(W window, Iterable<StreamRecord<IN>> contents) throws Exception {
 		timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
-		Iterable<StreamRecord<IN>> contents = windowState.get();
 
 		// Work around type system restrictions...
 		int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window);
@@ -307,7 +345,6 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 			mergingWindows.retireWindow(window);
 		}
 		context.clear();
-		deleteCleanupTimer(window);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
index d02a348..c806d2d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
@@ -80,8 +80,11 @@ public class MergingWindowSet<W extends Window> {
 		this.windowAssigner = windowAssigner;
 		windows = new HashMap<>();
 
-		for (Tuple2<W, W> window: state.get()) {
-			windows.put(window.f0, window.f1);
+		Iterable<Tuple2<W, W>> windowState = state.get();
+		if (windowState != null) {
+			for (Tuple2<W, W> window: windowState) {
+				windows.put(window.f0, window.f1);
+			}
 		}
 	}
 
@@ -100,12 +103,7 @@ public class MergingWindowSet<W extends Window> {
 	 * @param window The window for which to get the state window.
 	 */
 	public W getStateWindow(W window) {
-		W result = windows.get(window);
-		if (result == null) {
-			throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
-		}
-
-		return result;
+		return windows.get(window);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index bb05d2b..2434843 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -334,12 +334,15 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 				// drop if the window is already late
 				if (isLate(actualWindow)) {
-					LOG.info("Dropped element " + element+ " for window " + actualWindow + " due to lateness.");
 					mergingWindows.retireWindow(actualWindow);
 					continue;
 				}
 
 				W stateWindow = mergingWindows.getStateWindow(actualWindow);
+				if (stateWindow == null) {
+					throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
+				}
+
 				AppendingState<IN, ACC> windowState = getPartitionedState(
 					stateWindow, windowSerializer, windowStateDescriptor);
 				windowState.add(element.getValue());
@@ -351,7 +354,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				// on the (possibly merged) window
 				TriggerResult triggerResult = context.onElement(element);
 				TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0);
-				fireOrContinue(combinedTriggerResult, actualWindow, windowState);
+
+				if (combinedTriggerResult.isFire()) {
+					ACC contents = windowState.get();
+					if (contents == null) {
+						continue;
+					}
+					fire(actualWindow, contents);
+				}
 
 				if (combinedTriggerResult.isPurge()) {
 					cleanup(actualWindow, windowState, mergingWindows);
@@ -364,7 +374,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 				// drop if the window is already late
 				if (isLate(window)) {
-					LOG.info("Dropped element " + element + " for window " + window + " due to lateness.");
 					continue;
 				}
 
@@ -376,7 +385,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				context.window = window;
 
 				TriggerResult triggerResult = context.onElement(element);
-				fireOrContinue(triggerResult, window, windowState);
+
+				if (triggerResult.isFire()) {
+					ACC contents = windowState.get();
+					if (contents == null) {
+						continue;
+					}
+					fire(window, contents);
+				}
 
 				if (triggerResult.isPurge()) {
 					cleanup(window, windowState, null);
@@ -408,16 +424,30 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				if (windowAssigner instanceof MergingWindowAssigner) {
 					mergingWindows = getMergingWindowSet();
 					W stateWindow = mergingWindows.getStateWindow(context.window);
+					if (stateWindow == null) {
+						// then the window is already purged and this is a cleanup
+						// timer set due to allowed lateness that has nothing to clean,
+						// so it is safe to just ignore
+						continue;
+					}
 					windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
 				} else {
 					windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
 				}
 
+				ACC contents = windowState.get();
+				if (contents == null) {
+					// if we have no state, there is nothing to do
+					continue;
+				}
+
 				TriggerResult triggerResult = context.onEventTime(timer.timestamp);
-				fireOrContinue(triggerResult, context.window, windowState);
+				if (triggerResult.isFire()) {
+					fire(context.window, contents);
+				}
 
-				if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
-					cleanup(timer.window, windowState, mergingWindows);
+				if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
+					cleanup(context.window, windowState, mergingWindows);
 				}
 
 			} else {
@@ -456,16 +486,30 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				if (windowAssigner instanceof MergingWindowAssigner) {
 					mergingWindows = getMergingWindowSet();
 					W stateWindow = mergingWindows.getStateWindow(context.window);
+					if (stateWindow == null) {
+						// then the window is already purged and this is a cleanup
+						// timer set due to allowed lateness that has nothing to clean,
+						// so it is safe to just ignore
+						continue;
+					}
 					windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
 				} else {
 					windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
 				}
 
+				ACC contents = windowState.get();
+				if (contents == null) {
+					// if we have no state, there is nothing to do
+					continue;
+				}
+
 				TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
-				fireOrContinue(triggerResult, context.window, windowState);
+				if (triggerResult.isFire()) {
+					fire(context.window, contents);
+				}
 
-				if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
-					cleanup(timer.window, windowState, mergingWindows);
+				if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
+					cleanup(context.window, windowState, mergingWindows);
 				}
 
 			} else {
@@ -487,7 +531,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			mergingWindows.retireWindow(window);
 		}
 		context.clear();
-		deleteCleanupTimer(window);
 	}
 
 	/**
@@ -495,15 +538,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 * The caller must ensure that the correct key is set in the state backend and the context object.
 	 */
 	@SuppressWarnings("unchecked")
-	private void fireOrContinue(TriggerResult triggerResult,
-								W window,
-								AppendingState<IN, ACC> windowState) throws Exception {
-		if (!triggerResult.isFire()) {
-			return;
-		}
-
+	private void fire(W window, ACC contents) throws Exception {
 		timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
-		ACC contents = windowState.get();
 		userFunction.apply(context.key, context.window, contents, timestampedCollector);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
index cf90f8a..939f13f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
@@ -133,12 +133,7 @@ public class MergingWindowSetTest {
 		// retire the first batch of windows
 		windowSet.retireWindow(new TimeWindow(0, 6));
 
-		try {
-			windowSet.getStateWindow(new TimeWindow(0, 6));
-			fail("Expected exception");
-		} catch (IllegalStateException e) {
-			//ignore
-		}
+		assertTrue(windowSet.getStateWindow(new TimeWindow(0, 6)) == null);
 
 		assertTrue(windowSet.getStateWindow(new TimeWindow(10, 15)).equals(new TimeWindow(11, 14)));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 90bd3f2..62266c4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -17,9 +17,12 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.Iterables;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -51,6 +54,8 @@ import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
@@ -1952,10 +1957,379 @@ public class WindowOperatorTest {
 		testHarness.close();
 	}
 
+	@Test
+	public void testCleanupTimerWithEmptyListStateForTumblingWindows2() throws Exception {
+		final int WINDOW_SIZE = 2;
+		final long LATENESS = 100;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc =
+			new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, String, TimeWindow> operator =
+			new WindowOperator<>(
+				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				windowStateDesc,
+				new InternalIterableWindowFunction<>(new PassThroughFunction2()),
+				new EventTimeTriggerAccumGC(LATENESS),
+				LATENESS);
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		operator.setInputType(inputType, new ExecutionConfig());
+		testHarness.open();
+
+		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+
+		// normal element
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processWatermark(new Watermark(1599));
+		testHarness.processWatermark(new Watermark(1999));
+		testHarness.processWatermark(new Watermark(2100));
+		testHarness.processWatermark(new Watermark(5000));
+
+		expected.add(new Watermark(1599));
+		expected.add(new StreamRecord<>("GOT: (key2,1)", 1999));
+		expected.add(new Watermark(1999)); // here it fires and purges
+		expected.add(new Watermark(2100)); // here is the cleanup timer
+		expected.add(new Watermark(5000));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
+		testHarness.close();
+	}
+
+	private class PassThroughFunction2 implements WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void apply(String k, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
+			out.collect("GOT: " + Joiner.on(",").join(input));
+		}
+	}
+
+	@Test
+	public void testCleanupTimerWithEmptyListStateForTumblingWindows() throws Exception {
+		final int WINDOW_SIZE = 2;
+		final long LATENESS = 1;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc =
+			new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator =
+			new WindowOperator<>(
+				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				windowStateDesc,
+				new InternalIterableWindowFunction<>(new PassThroughFunction()),
+				EventTimeTrigger.create(),
+				LATENESS);
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		operator.setInputType(inputType, new ExecutionConfig());
+		testHarness.open();
+
+		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+
+		// normal element
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processWatermark(new Watermark(1599));
+		testHarness.processWatermark(new Watermark(1999));
+		testHarness.processWatermark(new Watermark(2000));
+		testHarness.processWatermark(new Watermark(5000));
+
+		expected.add(new Watermark(1599));
+		expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+		expected.add(new Watermark(1999)); // here it fires and purges
+		expected.add(new Watermark(2000)); // here is the cleanup timer
+		expected.add(new Watermark(5000));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
+		testHarness.close();
+	}
+
+	@Test
+	public void testCleanupTimerWithEmptyReduceStateForTumblingWindows() throws Exception {
+		final int WINDOW_SIZE = 2;
+		final long LATENESS = 1;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+			new SumReducer(),
+			inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
+			new WindowOperator<>(
+				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+				EventTimeTrigger.create(),
+				LATENESS);
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		operator.setInputType(inputType, new ExecutionConfig());
+		testHarness.open();
+
+		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+
+		// normal element
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processWatermark(new Watermark(1599));
+		testHarness.processWatermark(new Watermark(1999));
+		testHarness.processWatermark(new Watermark(2000));
+		testHarness.processWatermark(new Watermark(5000));
+
+		expected.add(new Watermark(1599));
+		expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+		expected.add(new Watermark(1999)); // here it fires and purges
+		expected.add(new Watermark(2000)); // here is the cleanup timer
+		expected.add(new Watermark(5000));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
+		testHarness.close();
+	}
+
+	@Test
+	public void testCleanupTimerWithEmptyFoldingStateForTumblingWindows() throws Exception {
+		final int WINDOW_SIZE = 2;
+		final long LATENESS = 1;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		FoldingStateDescriptor<Tuple2<String, Integer>, Tuple2<String, Integer>> windowStateDesc =
+			new FoldingStateDescriptor<>(
+				"window-contents",
+				new Tuple2<>((String) null, 0),
+				new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
+					@Override
+					public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
+						return new Tuple2<>(value.f0, accumulator.f1 + value.f1);
+					}
+				},
+				inputType);
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
+			new WindowOperator<>(
+				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				windowStateDesc,
+				new InternalSingleValueWindowFunction<>(new PassThroughFunction()),
+				EventTimeTrigger.create(),
+				LATENESS);
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		operator.setInputType(inputType, new ExecutionConfig());
+		testHarness.open();
+
+		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+
+		// normal element
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processWatermark(new Watermark(1599));
+		testHarness.processWatermark(new Watermark(1999));
+		testHarness.processWatermark(new Watermark(2000));
+		testHarness.processWatermark(new Watermark(5000));
+
+		expected.add(new Watermark(1599));
+		expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+		expected.add(new Watermark(1999)); // here it fires and purges
+		expected.add(new Watermark(2000)); // here is the cleanup timer
+		expected.add(new Watermark(5000));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
+		testHarness.close();
+	}
+
+	@Test
+	public void testCleanupTimerWithEmptyListStateForSessionWindows() throws Exception {
+		final int GAP_SIZE = 3;
+		final long LATENESS = 10;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc =
+			new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator =
+			new WindowOperator<>(
+				EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				windowStateDesc,
+				new InternalIterableWindowFunction<>(new PassThroughFunction()),
+				EventTimeTrigger.create(),
+				LATENESS);
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		operator.setInputType(inputType, new ExecutionConfig());
+		testHarness.open();
+
+		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processWatermark(new Watermark(4998));
+
+		expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
+		expected.add(new Watermark(4998));
+
+		testHarness.processWatermark(new Watermark(14600));
+		expected.add(new Watermark(14600));
+
+		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
+		testHarness.close();
+	}
+
+	@Test
+	public void testCleanupTimerWithEmptyReduceStateForSessionWindows() throws Exception {
+
+		final int GAP_SIZE = 3;
+		final long LATENESS = 10;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+			new SumReducer(),
+			inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator =
+			new WindowOperator<>(
+				EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
+				EventTimeTrigger.create(),
+				LATENESS);
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.open();
+
+		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processWatermark(new Watermark(4998));
+
+		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 4000L), 3999));
+		expected.add(new Watermark(4998));
+
+		testHarness.processWatermark(new Watermark(14600));
+		expected.add(new Watermark(14600));
+
+		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
+		testHarness.close();
+	}
+
+	@Test
+	public void testCleanupTimerWithEmptyFoldingStateForSessionWindows() throws Exception {
+		final int GAP_SIZE = 3;
+		final long LATENESS = 10;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		FoldingStateDescriptor<Tuple2<String, Integer>, Tuple2<String, Integer>> windowStateDesc =
+			new FoldingStateDescriptor<>(
+				"window-contents",
+				new Tuple2<>((String) null, 0),
+				new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
+					@Override
+					public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
+						return new Tuple2<>(value.f0, accumulator.f1 + value.f1);
+					}
+				},
+				inputType);
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
+			new WindowOperator<>(
+				EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				windowStateDesc,
+				new InternalSingleValueWindowFunction<>(new PassThroughFunction()),
+				EventTimeTrigger.create(),
+				LATENESS);
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		operator.setInputType(inputType, new ExecutionConfig());
+		testHarness.open();
+
+		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processWatermark(new Watermark(4998));
+
+		expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
+		expected.add(new Watermark(4998));
+
+		testHarness.processWatermark(new Watermark(14600));
+		expected.add(new Watermark(14600));
+
+		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
+		testHarness.close();
+	}
+
 	// ------------------------------------------------------------------------
 	//  UDFs
 	// ------------------------------------------------------------------------
 
+	private class PassThroughFunction implements WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void apply(String k, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
+			for (Tuple2<String, Integer> in: input) {
+				out.collect(in);
+			}
+		}
+	}
+
 	public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
 		private static final long serialVersionUID = 1L;
 		@Override
@@ -2112,4 +2486,68 @@ public class WindowOperatorTest {
 			return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
 		}
 	}
+
+	/**
+	 * A trigger that fires at the end of the window but does not
+	 * purge the state of the fired window. This is to test the state
+	 * garbage collection mechanism.
+	 */
+	public class EventTimeTriggerAccumGC extends Trigger<Object, TimeWindow> {
+		private static final long serialVersionUID = 1L;
+
+		private long cleanupTime;
+
+		private EventTimeTriggerAccumGC() {
+			cleanupTime = 0L;
+		}
+
+		public EventTimeTriggerAccumGC(long cleanupTime) {
+			this.cleanupTime = cleanupTime;
+		}
+
+		@Override
+		public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
+			if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
+				// if the watermark is already past the window fire immediately
+				return TriggerResult.FIRE;
+			} else {
+				ctx.registerEventTimeTimer(window.maxTimestamp());
+				return TriggerResult.CONTINUE;
+			}
+		}
+
+		@Override
+		public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
+			return time == window.maxTimestamp() || time == window.maxTimestamp() + cleanupTime ?
+				TriggerResult.FIRE_AND_PURGE :
+				TriggerResult.CONTINUE;
+		}
+
+		@Override
+		public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
+			return TriggerResult.CONTINUE;
+		}
+
+		@Override
+		public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
+			ctx.deleteEventTimeTimer(window.maxTimestamp());
+		}
+
+		@Override
+		public boolean canMerge() {
+			return true;
+		}
+
+		@Override
+		public TriggerResult onMerge(TimeWindow window,
+									 OnMergeContext ctx) {
+			ctx.registerEventTimeTimer(window.maxTimestamp());
+			return TriggerResult.CONTINUE;
+		}
+
+		@Override
+		public String toString() {
+			return "EventTimeTrigger()";
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
index eb137aa..9b4855f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
@@ -22,9 +22,7 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;


Mime
View raw message