flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [09/10] flink git commit: [FLINK-4994] Don't Clear Trigger State and Merging Window Set When Purging
Date Mon, 23 Jan 2017 14:01:49 GMT
[FLINK-4994] Don't Clear Trigger State and Merging Window Set When Purging

Before, when a Trigger returns TriggerResult.PURGE from any of the
on*() methods the WindowOperator will clear all state of that window
(window contents, merging window set) and call Trigger.clear() so that the
Trigger can clean up its state/timers.

This was problematic in some cases. For example, with merging windows (session
windows) this means that a late-arriving element will not be put into the
session that was previously built up but will be put into a completely new
session that only contains this one element.

The new behaviour is this:
 * Only clean window contents on PURGE
 * Register cleanup timer for any window, don't delete this on PURGE
 * When the cleanup timer fires: clean window state, clean merging window set,
call Trigger.clear() to allow it to clean state/timers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9189c20
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9189c20
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9189c20

Branch: refs/heads/release-1.2
Commit: a9189c2055fd3c10d741f63208aadca7fb4218f5
Parents: 704b411
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Nov 2 11:51:07 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Jan 23 14:53:22 2017 +0100

----------------------------------------------------------------------
 .../windowing/EvictingWindowOperator.java       | 118 ++++++++------
 .../operators/windowing/WindowOperator.java     | 155 +++++++++++--------
 .../operators/windowing/WindowOperatorTest.java |  11 +-
 3 files changed, 162 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a9189c20/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 8c73878..17b3984 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -154,14 +154,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
extends Window
 						// if we have no state, there is nothing to do
 						continue;
 					}
-					fire(actualWindow, contents, windowState);
+					emitWindowContents(actualWindow, contents, windowState);
 				}
 
 				if (triggerResult.isPurge()) {
-					cleanup(actualWindow, windowState, mergingWindows);
-				} else {
-					registerCleanupTimer(actualWindow);
+					windowState.clear();
 				}
+				registerCleanupTimer(actualWindow);
 			}
 
 			mergingWindows.persist();
@@ -190,14 +189,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
extends Window
 						// if we have no state, there is nothing to do
 						continue;
 					}
-					fire(window, contents, windowState);
+					emitWindowContents(window, contents, windowState);
 				}
 
 				if (triggerResult.isPurge()) {
-					cleanup(window, windowState, null);
-				} else {
-					registerCleanupTimer(window);
+					windowState.clear();
 				}
+				registerCleanupTimer(window);
 			}
 		}
 	}
@@ -217,12 +215,14 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
extends Window
 			mergingWindows = getMergingWindowSet();
 			W stateWindow = mergingWindows.getStateWindow(context.window);
 			if (stateWindow == null) {
-				// then the window is already purged and this is a cleanup
-				// timer set due to allowed lateness that has nothing to clean,
-				// so it is safe to just ignore
-				return;
+				// timer firing for non-existent window, still have to run the cleanup logic
+				windowState = null;
+			} else {
+				windowState = getPartitionedState(
+						stateWindow,
+						windowSerializer,
+						windowStateDescriptor);
 			}
-			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
 		} else {
 			windowState = getPartitionedState(
 					context.window,
@@ -230,19 +230,28 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
extends Window
 					windowStateDescriptor);
 		}
 
-		Iterable<StreamRecord<IN>> contents = windowState.get();
-		if (contents == null) {
-			// if we have no state, there is nothing to do
-			return;
+		Iterable<StreamRecord<IN>> contents = null;
+		if (windowState != null) {
+			contents = windowState.get();
 		}
 
-		TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
-		if (triggerResult.isFire()) {
-			fire(context.window, contents, windowState);
+		if (contents != null) {
+			TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+			if (triggerResult.isFire()) {
+				emitWindowContents(context.window, contents, windowState);
+			}
+			if (triggerResult.isPurge()) {
+				windowState.clear();
+			}
+		}
+
+		if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))
{
+			clearAllState(context.window, windowState, mergingWindows);
 		}
 
-		if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window,
timer.getTimestamp()))) {
-			cleanup(context.window, windowState, mergingWindows);
+		if (mergingWindows != null) {
+			// need to make sure to update the merging state in state
+			mergingWindows.persist();
 		}
 	}
 
@@ -260,33 +269,44 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
extends Window
 			mergingWindows = getMergingWindowSet();
 			W stateWindow = mergingWindows.getStateWindow(context.window);
 			if (stateWindow == null) {
-				// then the window is already purged and this is a cleanup
-				// timer set due to allowed lateness that has nothing to clean,
-				// so it is safe to just ignore
-				return;
+				// timer firing for non-existent window, still have to run the cleanup logic
+				windowState = null;
+			} else {
+				windowState = getPartitionedState(
+						stateWindow,
+						windowSerializer,
+						windowStateDescriptor);
 			}
-			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
 		} else {
 			windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
 		}
 
-		Iterable<StreamRecord<IN>> contents = windowState.get();
-		if (contents == null) {
-			// if we have no state, there is nothing to do
-			return;
+		Iterable<StreamRecord<IN>> contents = null;
+		if (windowState != null) {
+			contents = windowState.get();
 		}
 
-		TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
-		if (triggerResult.isFire()) {
-			fire(context.window, contents, windowState);
+		if (contents != null) {
+			TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+			if (triggerResult.isFire()) {
+				emitWindowContents(context.window, contents, windowState);
+			}
+			if (triggerResult.isPurge()) {
+				windowState.clear();
+			}
 		}
 
-		if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window,
timer.getTimestamp()))) {
-			cleanup(context.window, windowState, mergingWindows);
+		if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))
{
+			clearAllState(context.window, windowState, mergingWindows);
+		}
+
+		if (mergingWindows != null) {
+			// need to make sure to update the merging state in state
+			mergingWindows.persist();
 		}
 	}
 
-	private void fire(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>>
windowState) throws Exception {
+	private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents,
ListState<StreamRecord<IN>> windowState) throws Exception {
 		timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
 
 		// Work around type system restrictions...
@@ -320,6 +340,18 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
extends Window
 		}
 	}
 
+	private void clearAllState(
+			W window,
+			ListState<StreamRecord<IN>> windowState,
+			MergingWindowSet<W> mergingWindows) throws Exception {
+
+		windowState.clear();
+		context.clear();
+		if (mergingWindows != null) {
+			mergingWindows.retireWindow(window);
+			mergingWindows.persist();
+		}
+	}
 
 	/**
 	 * {@code EvictorContext} is a utility for handling {@code Evictor} invocations. It can
be reused
@@ -366,18 +398,6 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
extends Window
 		}
 	}
 
-	private void cleanup(W window,
-						ListState<StreamRecord<IN>> windowState,
-						MergingWindowSet<W> mergingWindows) throws Exception {
-
-		windowState.clear();
-		if (mergingWindows != null) {
-			mergingWindows.retireWindow(window);
-			mergingWindows.persist();
-		}
-		context.clear();
-	}
-
 	@Override
 	public void open() throws Exception {
 		super.open();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9189c20/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 5ed5a4e..3144b6d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -348,14 +348,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 					if (contents == null) {
 						continue;
 					}
-					fire(actualWindow, contents);
+					emitWindowContents(actualWindow, contents);
 				}
 
 				if (triggerResult.isPurge()) {
-					cleanup(actualWindow, windowState, mergingWindows);
-				} else {
-					registerCleanupTimer(actualWindow);
+					windowState.clear();
 				}
+				registerCleanupTimer(actualWindow);
 			}
 
 			// need to make sure to update the merging state in state
@@ -382,14 +381,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 					if (contents == null) {
 						continue;
 					}
-					fire(window, contents);
+					emitWindowContents(window, contents);
 				}
 
 				if (triggerResult.isPurge()) {
-					cleanup(window, windowState, null);
-				} else {
-					registerCleanupTimer(window);
+					windowState.clear();
 				}
+				registerCleanupTimer(window);
 			}
 		}
 	}
@@ -406,32 +404,40 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			mergingWindows = getMergingWindowSet();
 			W stateWindow = mergingWindows.getStateWindow(context.window);
 			if (stateWindow == null) {
-				// then the window is already purged and this is a cleanup
-				// timer set due to allowed lateness that has nothing to clean,
-				// so it is safe to just ignore
-				return;
+				// timer firing for non-existent window, ignore
+				windowState = null;
+			} else {
+				windowState = getPartitionedState(
+						stateWindow,
+						windowSerializer,
+						windowStateDescriptor);
 			}
-			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
 		} else {
-			windowState = getPartitionedState(
-					context.window,
-					windowSerializer,
-					windowStateDescriptor);
+			windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
 		}
 
-		ACC contents = windowState.get();
-		if (contents == null) {
-			// if we have no state, there is nothing to do
-			return;
+		ACC contents = null;
+		if (windowState != null) {
+			contents = windowState.get();
 		}
 
-		TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
-		if (triggerResult.isFire()) {
-			fire(context.window, contents);
+		if (contents != null) {
+			TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+			if (triggerResult.isFire()) {
+				emitWindowContents(context.window, contents);
+			}
+			if (triggerResult.isPurge()) {
+				windowState.clear();
+			}
 		}
 
-		if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window,
timer.getTimestamp()))) {
-			cleanup(context.window, windowState, mergingWindows);
+		if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))
{
+			clearAllState(context.window, windowState, mergingWindows);
+		}
+
+		if (mergingWindows != null) {
+			// need to make sure to update the merging state in state
+			mergingWindows.persist();
 		}
 	}
 
@@ -447,54 +453,67 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			mergingWindows = getMergingWindowSet();
 			W stateWindow = mergingWindows.getStateWindow(context.window);
 			if (stateWindow == null) {
-				// then the window is already purged and this is a cleanup
-				// timer set due to allowed lateness that has nothing to clean,
-				// so it is safe to just ignore
-				return;
+				// timer firing for non-existent window, ignore
+				windowState = null;
+			} else {
+				windowState = getPartitionedState(
+						stateWindow,
+						windowSerializer,
+						windowStateDescriptor);
 			}
-			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
 		} else {
 			windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
 		}
 
-		ACC contents = windowState.get();
-		if (contents == null) {
-			// if we have no state, there is nothing to do
-			return;
+		ACC contents = null;
+		if (windowState != null) {
+			contents = windowState.get();
 		}
 
-		TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
-		if (triggerResult.isFire()) {
-			fire(context.window, contents);
+		if (contents != null) {
+			TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+			if (triggerResult.isFire()) {
+				emitWindowContents(context.window, contents);
+			}
+			if (triggerResult.isPurge()) {
+				windowState.clear();
+			}
 		}
 
-		if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window,
timer.getTimestamp()))) {
-			cleanup(context.window, windowState, mergingWindows);
+		if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))
{
+			clearAllState(context.window, windowState, mergingWindows);
+		}
+
+		if (mergingWindows != null) {
+			// need to make sure to update the merging state in state
+			mergingWindows.persist();
 		}
 	}
 
 	/**
-	 * Cleans up the window state if the provided {@link TriggerResult} requires so, or if it
-	 * is time to do so (see {@link #isCleanupTime(Window, long)}). The caller must ensure that
the
+	 * Drops all state for the given window and calls
+	 * {@link Trigger#clear(Window, Trigger.TriggerContext)}.
+	 *
+	 * <p>The caller must ensure that the
 	 * correct key is set in the state backend and the context object.
 	 */
-	private void cleanup(W window,
-						AppendingState<IN, ACC> windowState,
-						MergingWindowSet<W> mergingWindows) throws Exception {
+	private void clearAllState(
+			W window,
+			AppendingState<IN, ACC> windowState,
+			MergingWindowSet<W> mergingWindows) throws Exception {
 		windowState.clear();
+		context.clear();
 		if (mergingWindows != null) {
 			mergingWindows.retireWindow(window);
 			mergingWindows.persist();
 		}
-		context.clear();
 	}
 
 	/**
-	 * Triggers the window computation if the provided {@link TriggerResult} requires so.
-	 * The caller must ensure that the correct key is set in the state backend and the context
object.
+	 * Emits the contents of the given window using the {@link InternalWindowFunction}.
 	 */
 	@SuppressWarnings("unchecked")
-	private void fire(W window, ACC contents) throws Exception {
+	private void emitWindowContents(W window, ACC contents) throws Exception {
 		timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
 		userFunction.apply(context.key, context.window, contents, timestampedCollector);
 	}
@@ -517,12 +536,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	}
 
 	/**
-	 * Decides if a window is currently late or not, based on the current
-	 * watermark, i.e. the current event time, and the allowed lateness.
-	 * @param window
-	 * 					The collection of windows returned by the {@link WindowAssigner}.
-	 * @return The windows (among the {@code eligibleWindows}) for which the element should
still be
-	 * 					considered when triggering.
+	 * Returns {@code true} if the watermark is after the end timestamp plus the allowed lateness
+	 * of the given window.
 	 */
 	protected boolean isLate(W window) {
 		return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
@@ -535,6 +550,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 */
 	protected void registerCleanupTimer(W window) {
 		long cleanupTime = cleanupTime(window);
+		if (cleanupTime == Long.MAX_VALUE) {
+			// don't set a GC timer for "end of time"
+			return;
+		}
+
 		if (windowAssigner.isEventTime()) {
 			context.registerEventTimeTimer(cleanupTime);
 		} else {
@@ -549,6 +569,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 */
 	protected void deleteCleanupTimer(W window) {
 		long cleanupTime = cleanupTime(window);
+		if (cleanupTime == Long.MAX_VALUE) {
+			// no need to clean up because we didn't set one
+			return;
+		}
 		if (windowAssigner.isEventTime()) {
 			context.deleteEventTimeTimer(cleanupTime);
 		} else {
@@ -566,24 +590,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 * @param window the window whose cleanup time we are computing.
 	 */
 	private long cleanupTime(W window) {
-		long cleanupTime = window.maxTimestamp() + allowedLateness;
-		return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
+		if (windowAssigner.isEventTime()) {
+			long cleanupTime = window.maxTimestamp() + allowedLateness;
+			return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
+		} else {
+			return window.maxTimestamp();
+		}
 	}
 
 	/**
-	 * Decides if it is time to clean up the window state.
-	 * Clean up time for a window is:
-	 * 		<li> if it is event time, after the watermark passes the end of the window plus
the user-specified allowed lateness
-	 * 		<li> if it is processing time, after the processing time at the node passes the
end of the window.
-	 * 	@param window
-	 * 					the window to clean
-	 *  @param time
-	 *  				the current time (event or processing depending on the {@link WindowAssigner}
-	 *  @return {@code true} if it is time to clean up the window state, {@code false} otherwise.
+	 * Returns {@code true} if the given time is the cleanup time for the given window.
 	 */
 	protected final boolean isCleanupTime(W window, long time) {
-		long cleanupTime = cleanupTime(window);
-		return  cleanupTime == time;
+		return time == cleanupTime(window);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a9189c20/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 0e2d1e8..e682e2d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -1599,20 +1599,21 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
 		expected.add(new Watermark(14600));
 
-		// dropped as late
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
 
+		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 14600L), 14599));
+
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
 		testHarness.processWatermark(new Watermark(20000));
 
-		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499));
+		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 17500L), 17499));
 		expected.add(new Watermark(20000));
 
 		testHarness.processWatermark(new Watermark(100000));
 		expected.add(new Watermark(100000));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new
Tuple2ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new
Tuple3ResultSortComparator());
 		testHarness.close();
 	}
 
@@ -1778,7 +1779,7 @@ public class WindowOperatorTest extends TestLogger {
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
 
-		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 13000L), 12999));
+		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 14600L), 14599));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new
Tuple3ResultSortComparator());
@@ -1786,7 +1787,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
 		testHarness.processWatermark(new Watermark(20000));
 
-		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499));
+		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 17500L), 17499));
 		expected.add(new Watermark(20000));
 
 		testHarness.processWatermark(new Watermark(100000));


Mime
View raw message