flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-4239] Set Default Allowed Lateness to Zero and Make Triggers Non-Purging
Date Tue, 26 Jul 2016 09:32:56 GMT
Repository: flink
Updated Branches:
  refs/heads/master f0ac261ad -> 0b4c04d7d


[FLINK-4239] Set Default Allowed Lateness to Zero and Make Triggers Non-Purging

This closes #2278.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b4c04d7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b4c04d7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b4c04d7

Branch: refs/heads/master
Commit: 0b4c04d7dd610091cc7597357923be06d5be7d65
Parents: f0ac261
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Thu Jul 21 15:10:54 2016 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Tue Jul 26 11:32:42 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/windows.md                  |  3 +
 .../api/datastream/AllWindowedStream.java       | 12 +--
 .../api/datastream/WindowedStream.java          | 12 +--
 .../windowing/triggers/EventTimeTrigger.java    |  4 +-
 .../triggers/ProcessingTimeTrigger.java         |  2 +-
 .../api/windowing/triggers/PurgingTrigger.java  | 36 +--------
 .../operators/windowing/WindowOperatorTest.java | 84 +++-----------------
 .../sessionwindows/SessionWindowITCase.java     | 63 +--------------
 8 files changed, 39 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0b4c04d7/docs/apis/streaming/windows.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/windows.md b/docs/apis/streaming/windows.md
index fba17fc..7a93723 100644
--- a/docs/apis/streaming/windows.md
+++ b/docs/apis/streaming/windows.md
@@ -515,6 +515,9 @@ and are considered when computing window results. If elements arrive after
the a
 will be dropped. Flink will also make sure that any state held by the windowing operation
is garbage
 collected once the watermark passes the end of a window plus the allowed lateness.
 
+<span class="label label-info">Default</span> By default, the allowed lateness
is set to
+`0`. That is, elements that arrive behind the watermark will be dropped.
+
 You can specify an allowed lateness like this:
 
 <div class="codetabs" markdown="1">

http://git-wip-us.apache.org/repos/asf/flink/blob/0b4c04d7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index e5dacc0..fa3b90d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -89,7 +89,7 @@ public class AllWindowedStream<T, W extends Window> {
 	private Evictor<? super T, ? super W> evictor;
 
 	/** The user-specified allowed lateness. */
-	private long allowedLateness = Long.MAX_VALUE;
+	private long allowedLateness = 0L;
 
 	@PublicEvolving
 	public AllWindowedStream(DataStream<T> input,
@@ -113,11 +113,11 @@ public class AllWindowedStream<T, W extends Window> {
 	}
 
 	/**
-	 * Sets the allowed lateness to a user-specified value.
-	 * If not explicitly set, the allowed lateness is {@link Long#MAX_VALUE}.
-	 * Setting the allowed lateness is only valid for event-time windows.
-	 * If a value different than 0 is provided with a processing-time
-	 * {@link WindowAssigner}, then an exception is thrown.
+	 * Sets the time by which elements are allowed to be late. Elements that
+	 * arrive behind the watermark by more than the specified time will be dropped.
+	 * By default, the allowed lateness is {@code 0L}.
+	 *
+	 * <p>Setting an allowed lateness is only valid for event-time windows.
 	 */
 	@PublicEvolving
 	public AllWindowedStream<T, W> allowedLateness(Time lateness) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0b4c04d7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 6110480..e81d7af 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -100,7 +100,7 @@ public class WindowedStream<T, K, W extends Window> {
 	private Evictor<? super T, ? super W> evictor;
 
 	/** The user-specified allowed lateness. */
-	private long allowedLateness = Long.MAX_VALUE;
+	private long allowedLateness = 0L;
 
 	@PublicEvolving
 	public WindowedStream(KeyedStream<T, K> input,
@@ -124,11 +124,11 @@ public class WindowedStream<T, K, W extends Window> {
 	}
 
 	/**
-	 * Sets the allowed lateness to a user-specified value.
-	 * If not explicitly set, the allowed lateness is {@link Long#MAX_VALUE}.
-	 * Setting the allowed lateness is only valid for event-time windows.
-	 * If a value different than 0 is provided with a processing-time
-	 * {@link WindowAssigner}, then an exception is thrown.
+	 * Sets the time by which elements are allowed to be late. Elements that
+	 * arrive behind the watermark by more than the specified time will be dropped.
+	 * By default, the allowed lateness is {@code 0L}.
+	 *
+	 * <p>Setting an allowed lateness is only valid for event-time windows.
 	 */
 	@PublicEvolving
 	public WindowedStream<T, K, W> allowedLateness(Time lateness) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0b4c04d7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
index 96e862f..da14ffd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -37,7 +37,7 @@ public class EventTimeTrigger extends Trigger<Object, TimeWindow>
{
 	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext
ctx) throws Exception {
 		if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
 			// if the watermark is already past the window fire immediately
-			return TriggerResult.FIRE_AND_PURGE;
+			return TriggerResult.FIRE;
 		} else {
 			ctx.registerEventTimeTimer(window.maxTimestamp());
 			return TriggerResult.CONTINUE;
@@ -47,7 +47,7 @@ public class EventTimeTrigger extends Trigger<Object, TimeWindow>
{
 	@Override
 	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
 		return time == window.maxTimestamp() ?
-			TriggerResult.FIRE_AND_PURGE :
+			TriggerResult.FIRE :
 			TriggerResult.CONTINUE;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b4c04d7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index 8ea6a43..a010286 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -44,7 +44,7 @@ public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow>
{
 
 	@Override
 	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
{
-		return TriggerResult.FIRE_AND_PURGE;
+		return TriggerResult.FIRE;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0b4c04d7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index 289702a..85d0b52 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -44,40 +44,19 @@ public class PurgingTrigger<T, W extends Window> extends Trigger<T,
W> {
 	@Override
 	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)
throws Exception {
 		TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
-		switch (triggerResult) {
-			case FIRE:
-				return TriggerResult.FIRE_AND_PURGE;
-			case FIRE_AND_PURGE:
-				return TriggerResult.FIRE_AND_PURGE;
-			default:
-				return TriggerResult.CONTINUE;
-		}
+		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
 	}
 
 	@Override
 	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception
{
 		TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
-		switch (triggerResult) {
-			case FIRE:
-				return TriggerResult.FIRE_AND_PURGE;
-			case FIRE_AND_PURGE:
-				return TriggerResult.FIRE_AND_PURGE;
-			default:
-				return TriggerResult.CONTINUE;
-		}
+		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
 	}
 
 	@Override
 	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception
{
 		TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
-		switch (triggerResult) {
-			case FIRE:
-				return TriggerResult.FIRE_AND_PURGE;
-			case FIRE_AND_PURGE:
-				return TriggerResult.FIRE_AND_PURGE;
-			default:
-				return TriggerResult.CONTINUE;
-		}
+		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
 	}
 
 	@Override
@@ -93,14 +72,7 @@ public class PurgingTrigger<T, W extends Window> extends Trigger<T,
W> {
 	@Override
 	public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception {
 		TriggerResult triggerResult = nestedTrigger.onMerge(window, ctx);
-		switch (triggerResult) {
-			case FIRE:
-				return TriggerResult.FIRE_AND_PURGE;
-			case FIRE_AND_PURGE:
-				return TriggerResult.FIRE_AND_PURGE;
-			default:
-				return TriggerResult.CONTINUE;
-		}
+		return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0b4c04d7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index ba335ee..90bd3f2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -51,8 +51,6 @@ import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
@@ -1154,7 +1152,7 @@ public class WindowOperatorTest {
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String,
TimeWindow, Tuple2<String, Integer>>()),
-				EventTimeTrigger.create(),
+				PurgingTrigger.of(EventTimeTrigger.create()),
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness =
@@ -1409,7 +1407,7 @@ public class WindowOperatorTest {
 	}
 
 	@Test
-	public void testDropDueToLatenessSessionZeroLateness() throws Exception {
+	public void testDropDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exception {
 		final int GAP_SIZE = 3;
 		final long LATENESS = 0;
 
@@ -1427,7 +1425,7 @@ public class WindowOperatorTest {
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
-				EventTimeTrigger.create(),
+				PurgingTrigger.of(EventTimeTrigger.create()),
 				LATENESS);
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String,
Integer>"), new ExecutionConfig());
@@ -1497,7 +1495,7 @@ public class WindowOperatorTest {
 	}
 
 	@Test
-	public void testDropDueToLatenessSessionZeroLatenessAccum() throws Exception {
+	public void testDropDueToLatenessSessionZeroLateness() throws Exception {
 		// same as testDropDueToLatenessSessionZeroLateness() but with an accumulating trigger,
i.e.
 		// one that does not return FIRE_AND_PURGE when firing but just FIRE
 
@@ -1521,7 +1519,7 @@ public class WindowOperatorTest {
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
-				new EventTimeTriggerAccum(),
+				EventTimeTrigger.create(),
 				LATENESS);
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String,
Integer>"), new ExecutionConfig());
@@ -1587,7 +1585,7 @@ public class WindowOperatorTest {
 	}
 
 	@Test
-	public void testDropDueToLatenessSessionWithLateness() throws Exception {
+	public void testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exception {
 
 		// this has the same output as testDropDueToLatenessSessionZeroLateness() because
 		// the allowed lateness is too small to make a difference
@@ -1609,7 +1607,7 @@ public class WindowOperatorTest {
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
-				EventTimeTrigger.create(),
+				PurgingTrigger.of(EventTimeTrigger.create()),
 				LATENESS);
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String,
Integer>"), new ExecutionConfig());
@@ -1675,7 +1673,7 @@ public class WindowOperatorTest {
 	}
 
 	@Test
-	public void testDropDueToLatenessSessionWithLatenessAccum() throws Exception {
+	public void testDropDueToLatenessSessionWithLateness() throws Exception {
 		// same as testDropDueToLatenessSessionWithLateness() but with an accumulating trigger,
i.e.
 		// one that does not return FIRE_AND_PURGE when firing but just FIRE. The expected
 		// results are therefore slightly different.
@@ -1697,7 +1695,7 @@ public class WindowOperatorTest {
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
-				new EventTimeTriggerAccum(),
+				EventTimeTrigger.create(),
 				LATENESS);
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String,
Integer>"), new ExecutionConfig());
@@ -1775,7 +1773,7 @@ public class WindowOperatorTest {
 	}
 
 	@Test
-	public void testDropDueToLatenessSessionWithHugeLateness() throws Exception {
+	public void testDropDueToLatenessSessionWithHugeLatenessPurgingTrigger() throws Exception
{
 
 		final int GAP_SIZE = 3;
 		final long LATENESS = 10000;
@@ -1794,7 +1792,7 @@ public class WindowOperatorTest {
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
-				EventTimeTrigger.create(),
+				PurgingTrigger.of(EventTimeTrigger.create()),
 				LATENESS);
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String,
Integer>"), new ExecutionConfig());
@@ -1865,7 +1863,7 @@ public class WindowOperatorTest {
 	}
 
 	@Test
-	public void testDropDueToLatenessSessionWithHugeLatenessAccum() throws Exception {
+	public void testDropDueToLatenessSessionWithHugeLateness() throws Exception {
 		final int GAP_SIZE = 3;
 		final long LATENESS = 10000;
 
@@ -1883,7 +1881,7 @@ public class WindowOperatorTest {
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
-				new EventTimeTriggerAccum(),
+				EventTimeTrigger.create(),
 				LATENESS);
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String,
Integer>"), new ExecutionConfig());
@@ -2114,60 +2112,4 @@ public class WindowOperatorTest {
 			return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
 		}
 	}
-
-	/**
-	 * A trigger that fires at the end of the window but does not
-	 * purge the state of the fired window. This is to test the state
-	 * garbage collection mechanism.
-	 */
-	public class EventTimeTriggerAccum extends Trigger<Object, TimeWindow> {
-		private static final long serialVersionUID = 1L;
-
-		private EventTimeTriggerAccum() {}
-
-		@Override
-		public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext
ctx) throws Exception {
-			if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
-				// if the watermark is already past the window fire immediately
-				return TriggerResult.FIRE;
-			} else {
-				ctx.registerEventTimeTimer(window.maxTimestamp());
-				return TriggerResult.CONTINUE;
-			}
-		}
-
-		@Override
-		public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
-			return time == window.maxTimestamp() ?
-				TriggerResult.FIRE :
-				TriggerResult.CONTINUE;
-		}
-
-		@Override
-		public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
throws Exception {
-			return TriggerResult.CONTINUE;
-		}
-
-		@Override
-		public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
-			ctx.deleteEventTimeTimer(window.maxTimestamp());
-		}
-
-		@Override
-		public boolean canMerge() {
-			return true;
-		}
-
-		@Override
-		public TriggerResult onMerge(TimeWindow window,
-									 OnMergeContext ctx) {
-			ctx.registerEventTimeTimer(window.maxTimestamp());
-			return TriggerResult.CONTINUE;
-		}
-
-		@Override
-		public String toString() {
-			return "EventTimeTrigger()";
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b4c04d7/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
index 26f1bcc..eb137aa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -114,8 +115,8 @@ public class SessionWindowITCase extends StreamingMultipleProgramsTestBase
{
 			windowedStream = windowedStream.allowedLateness(Time.milliseconds(ALLOWED_LATENESS_MS));
 		}
 
-		if (!PURGE_WINDOW_ON_FIRE) {
-			windowedStream = windowedStream.trigger(new NonPurgingEventTimeTriggerWrapper());
+		if (PURGE_WINDOW_ON_FIRE) {
+			windowedStream = windowedStream.trigger(PurgingTrigger.of(EventTimeTrigger.create()));
 		}
 
 		windowedStream.apply(windowFunction).print();
@@ -284,60 +285,4 @@ public class SessionWindowITCase extends StreamingMultipleProgramsTestBase
{
 			isRunning = false;
 		}
 	}
-
-	/**
-	 * Wrapper class that converts purging triggers into non-purging ones
-	 */
-	private static final class NonPurgingEventTimeTriggerWrapper
-			extends Trigger<SessionEvent<Integer, TestEventPayload>, TimeWindow> {
-
-		static final long serialVersionUID = 34763482396L;
-
-		EventTimeTrigger delegate = EventTimeTrigger.create();
-
-		@Override
-		public TriggerResult onElement(
-				SessionEvent<Integer, TestEventPayload> element,
-				long timestamp,
-				TimeWindow window,
-				TriggerContext ctx) throws Exception {
-			return removePurging(delegate.onElement(element, timestamp, window, ctx));
-
-		}
-
-		@Override
-		public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
throws Exception {
-			return removePurging(delegate.onProcessingTime(time, window, ctx));
-		}
-
-		@Override
-		public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws
Exception {
-			return removePurging(delegate.onEventTime(time, window, ctx));
-		}
-
-		@Override
-		public boolean canMerge() {
-			return delegate.canMerge();
-		}
-
-		@Override
-		public TriggerResult onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
-			return removePurging(delegate.onMerge(window, ctx));
-		}
-
-		@Override
-		public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
-			delegate.clear(window, ctx);
-		}
-
-		private TriggerResult removePurging(TriggerResult result) {
-			if (TriggerResult.PURGE == result) {
-				return TriggerResult.CONTINUE;
-			} else if (TriggerResult.FIRE_AND_PURGE == result) {
-				return TriggerResult.FIRE;
-			} else {
-				return result;
-			}
-		}
-	}
-}
\ No newline at end of file
+}


Mime
View raw message