flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [13/16] flink git commit: [FLINK-3200] Fix Triggers by introducing clear() method to clean up state/triggers
Date Wed, 03 Feb 2016 20:12:32 GMT
[FLINK-3200] Fix Triggers by introducing clear() method to clean up state/triggers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/456d0aba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/456d0aba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/456d0aba

Branch: refs/heads/master
Commit: 456d0abaf7722ab16d91c5e2b52d80c076513921
Parents: e4d05f7
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Feb 2 18:03:06 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 3 20:28:32 2016 +0100

----------------------------------------------------------------------
 .../examples/windowing/SessionWindowing.java    |  19 +++-
 .../api/windowing/assigners/GlobalWindows.java  |   3 +
 .../triggers/ContinuousEventTimeTrigger.java    |   3 +
 .../ContinuousProcessingTimeTrigger.java        |   3 +
 .../api/windowing/triggers/CountTrigger.java    |   5 +
 .../api/windowing/triggers/DeltaTrigger.java    |   5 +
 .../windowing/triggers/EventTimeTrigger.java    |   5 +
 .../triggers/ProcessingTimeTrigger.java         |   5 +
 .../api/windowing/triggers/PurgingTrigger.java  |   5 +
 .../api/windowing/triggers/Trigger.java         |  17 ++++
 .../windowing/NonKeyedWindowOperator.java       |  25 +++++
 .../operators/windowing/WindowOperator.java     | 100 ++++++++++++-------
 12 files changed, 157 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 69f61bc..dafe86f 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -45,7 +45,7 @@ public class SessionWindowing {
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		env.setParallelism(2);
+		env.setParallelism(1);
 
 		final List<Tuple3<String, Long, Integer>> input = new ArrayList<>();
 
@@ -103,7 +103,7 @@ public class SessionWindowing {
 
 		private final Long sessionTimeout;
 
-		private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("last-seen",
1L,
+		private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("last-seen",
-1L,
 			BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()));
 
 
@@ -120,12 +120,15 @@ public class SessionWindowing {
 
 			Long timeSinceLastEvent = timestamp - lastSeen;
 
+			ctx.deleteEventTimeTimer(lastSeen + sessionTimeout);
+
 			// Update the last seen event time
 			lastSeenState.update(timestamp);
 
 			ctx.registerEventTimeTimer(timestamp + sessionTimeout);
 
-			if (timeSinceLastEvent > sessionTimeout) {
+			if (lastSeen != -1 && timeSinceLastEvent > sessionTimeout) {
+				System.out.println("FIRING ON ELEMENT: " + element + " ts: " + timestamp + " last " +
lastSeen);
 				return TriggerResult.FIRE_AND_PURGE;
 			} else {
 				return TriggerResult.CONTINUE;
@@ -138,6 +141,7 @@ public class SessionWindowing {
 			Long lastSeen = lastSeenState.value();
 
 			if (time - lastSeen >= sessionTimeout) {
+				System.out.println("CTX: " + ctx + " Firing Time " + time + " last seen " + lastSeen);
 				return TriggerResult.FIRE_AND_PURGE;
 			}
 			return TriggerResult.CONTINUE;
@@ -147,6 +151,15 @@ public class SessionWindowing {
 		public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx)
throws Exception {
 			return TriggerResult.CONTINUE;
 		}
+
+		@Override
+		public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
+			ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc);
+			if (lastSeenState.value() != -1) {
+				ctx.deleteEventTimeTimer(lastSeenState.value() + sessionTimeout);
+			}
+			lastSeenState.clear();
+		}
 	}
 
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 99a4962..d3eb2ac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -87,6 +87,9 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow>
{
 		public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx)
{
 			return TriggerResult.CONTINUE;
 		}
+
+		@Override
+		public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 17818af..21e35db 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -74,6 +74,9 @@ public class ContinuousEventTimeTrigger<W extends Window> implements
Trigger<Obj
 	}
 
 	@Override
+	public void clear(W window, TriggerContext ctx) throws Exception {}
+
+	@Override
 	public String toString() {
 		return "ContinuousProcessingTimeTrigger(" + interval + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 20a2274..10c975f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -90,6 +90,9 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements
Trigge
 		return TriggerResult.CONTINUE;
 	}
 
+	@Override
+	public void clear(W window, TriggerContext ctx) throws Exception {}
+
 	@VisibleForTesting
 	public long getInterval() {
 		return interval;

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index e8742d5..3fcfb46 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -66,6 +66,11 @@ public class CountTrigger<W extends Window> implements Trigger<Object,
W> {
 	}
 
 	@Override
+	public void clear(W window, TriggerContext ctx) throws Exception {
+		ctx.getPartitionedState(stateDesc).clear();
+	}
+
+	@Override
 	public String toString() {
 		return "CountTrigger(" +  maxCount + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index 60ada88..3135961 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -72,6 +72,11 @@ public class DeltaTrigger<T, W extends Window> implements Trigger<T,
W> {
 	}
 
 	@Override
+	public void clear(W window, TriggerContext ctx) throws Exception {
+		ctx.getPartitionedState(stateDesc).clear();
+	}
+
+	@Override
 	public String toString() {
 		return "DeltaTrigger(" +  deltaFunction + ", " + threshold + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
index 831e360..bbd0a01 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -47,6 +47,11 @@ public class EventTimeTrigger implements Trigger<Object, TimeWindow>
{
 	}
 
 	@Override
+	public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
+		ctx.deleteEventTimeTimer(window.maxTimestamp());
+	}
+
+	@Override
 	public String toString() {
 		return "EventTimeTrigger()";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index b460c8a..85d6749 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -45,6 +45,11 @@ public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow>
{
 	}
 
 	@Override
+	public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
+		ctx.deleteProcessingTimeTimer(window.maxTimestamp());
+	}
+
+	@Override
 	public String toString() {
 		return "ProcessingTimeTrigger()";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index cc20296..626906c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -79,6 +79,11 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T,
W> {
 	}
 
 	@Override
+	public void clear(W window, TriggerContext ctx) throws Exception {
+		nestedTrigger.clear(window, ctx);
+	}
+
+	@Override
 	public String toString() {
 		return "PurgingTrigger(" + nestedTrigger.toString() + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index 8ea50b3..5c71355 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -73,6 +73,13 @@ public interface Trigger<T, W extends Window> extends Serializable
{
 	 */
 	TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
 
+	/**
+	 * Clears any state that the trigger might still hold for the given window. This is called
+	 * when a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)}
+	 * and {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as
+	 * well as state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
+	 */
+	void clear(W window, TriggerContext ctx) throws Exception;
 
 	/**
 	 * Result type for trigger methods. This determines what happens with the window.
@@ -151,6 +158,16 @@ public interface Trigger<T, W extends Window> extends Serializable
{
 		void registerEventTimeTimer(long time);
 
 		/**
+		 * Delete the processing time trigger for the given time.
+		 */
+		void deleteProcessingTimeTimer(long time);
+
+		/**
+		 * Delete the event-time trigger for the given time.
+		 */
+		void deleteEventTimeTimer(long time);
+
+		/**
 		 * Retrieves an {@link State} object that can be used to interact with
 		 * fault-tolerant state that is scoped to the window and key of the current
 		 * trigger invocation.

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index d9fa9f0..1b712d9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -284,6 +284,10 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		if (triggerResult.isFire()) {
 			emitWindow(context);
 		}
+
+		if (triggerResult.isPurge()) {
+			context.clear();
+		}
 	}
 
 	@Override
@@ -516,6 +520,23 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 			triggers.add(this);
 		}
 
+		@Override
+		public void deleteProcessingTimeTimer(long time) {
+			Set<Context> triggers = processingTimeTimers.get(time);
+			if (triggers != null) {
+				triggers.remove(this);
+			}
+		}
+
+		@Override
+		public void deleteEventTimeTimer(long time) {
+			Set<Context> triggers = watermarkTimers.get(time);
+			if (triggers != null) {
+				triggers.remove(this);
+			}
+
+		}
+
 		public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception
{
 			Trigger.TriggerResult onElementResult = trigger.onElement(element.getValue(), element.getTimestamp(),
window, this);
 			if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) {
@@ -553,6 +574,10 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 				return Trigger.TriggerResult.CONTINUE;
 			}
 		}
+
+		public void clear() throws Exception {
+			trigger.clear(window, this);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 5109dae..d562925 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -1,20 +1,20 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -85,8 +85,8 @@ import static java.util.Objects.requireNonNull;
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
  */
 public class WindowOperator<K, IN, ACC, OUT, W extends Window>
-		extends AbstractUdfStreamOperator<OUT, WindowFunction<ACC, OUT, K, W>>
-		implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
+	extends AbstractUdfStreamOperator<OUT, WindowFunction<ACC, OUT, K, W>>
+	implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
 
 	private static final long serialVersionUID = 1L;
 
@@ -164,12 +164,12 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
 	 */
 	public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
-			TypeSerializer<W> windowSerializer,
-			KeySelector<IN, K> keySelector,
-			TypeSerializer<K> keySerializer,
-			StateDescriptor<? extends MergingState<IN, ACC>> windowStateDescriptor,
-			WindowFunction<ACC, OUT, K, W> windowFunction,
-			Trigger<? super IN, ? super W> trigger) {
+		TypeSerializer<W> windowSerializer,
+		KeySelector<IN, K> keySelector,
+		TypeSerializer<K> keySerializer,
+		StateDescriptor<? extends MergingState<IN, ACC>> windowStateDescriptor,
+		WindowFunction<ACC, OUT, K, W> windowFunction,
+		Trigger<? super IN, ? super W> trigger) {
 
 		super(windowFunction);
 
@@ -258,8 +258,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		if (triggerResult.isFire()) {
 			timestampedCollector.setTimestamp(window.maxTimestamp());
 
-			setKeyContext(key);
-
 			MergingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer,
 				windowStateDescriptor);
 
@@ -269,12 +267,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 			if (triggerResult.isPurge()) {
 				windowState.clear();
+				context.clear();
 			}
 		} else if (triggerResult.isPurge()) {
-			setKeyContext(key);
 			MergingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer,
 				windowStateDescriptor);
 			windowState.clear();
+			context.clear();
 		}
 	}
 
@@ -293,7 +292,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 				context.key = timer.key;
 				context.window = timer.window;
-				Trigger.TriggerResult triggerResult = context.onEventTime(mark.getTimestamp());
+				setKeyContext(timer.key);
+				Trigger.TriggerResult triggerResult = context.onEventTime(timer.timestamp);
 				processTriggerResult(triggerResult, context.key, context.window);
 			} else {
 				fire = false;
@@ -319,7 +319,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 				context.key = timer.key;
 				context.window = timer.window;
-				Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
+				setKeyContext(timer.key);
+				Trigger.TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
 				processTriggerResult(triggerResult, context.key, context.window);
 			} else {
 				fire = false;
@@ -410,6 +411,23 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			}
 		}
 
+		@Override
+		public void deleteProcessingTimeTimer(long time) {
+			Timer<K, W> timer = new Timer<>(time, key, window);
+			if (processingTimeTimers.remove(timer)) {
+				processingTimeTimersQueue.remove(timer);
+			}
+		}
+
+		@Override
+		public void deleteEventTimeTimer(long time) {
+			Timer<K, W> timer = new Timer<>(time, key, window);
+			if (watermarkTimers.remove(timer)) {
+				watermarkTimersQueue.remove(timer);
+			}
+
+		}
+
 		public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception
{
 			return trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
 		}
@@ -421,6 +439,18 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		public Trigger.TriggerResult onEventTime(long time) throws Exception {
 			return trigger.onEventTime(time, window, this);
 		}
+
+		public void clear() throws Exception {
+			trigger.clear(window, this);
+		}
+
+		@Override
+		public String toString() {
+			return "Context{" +
+				"key=" + key +
+				", window=" + window +
+				'}';
+		}
 	}
 
 	/**
@@ -454,8 +484,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			Timer<?, ?> timer = (Timer<?, ?>) o;
 
 			return timestamp == timer.timestamp
-					&& key.equals(timer.key)
-					&& window.equals(timer.window);
+				&& key.equals(timer.key)
+				&& window.equals(timer.window);
 
 		}
 
@@ -470,10 +500,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		@Override
 		public String toString() {
 			return "Timer{" +
-					"timestamp=" + timestamp +
-					", key=" + key +
-					", window=" + window +
-					'}';
+				"timestamp=" + timestamp +
+				", key=" + key +
+				", window=" + window +
+				'}';
 		}
 	}
 


Mime
View raw message