flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [25/50] [abbrv] flink git commit: [FLINK-3371] [api-breaking] Move TriggerResult and TriggerContext to dedicated classes
Date Fri, 12 Feb 2016 11:29:50 GMT
[FLINK-3371] [api-breaking] Move TriggerResult and TriggerContext to dedicated classes

This closes #1603


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50bd65a5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50bd65a5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50bd65a5

Branch: refs/heads/tableOnCalcite
Commit: 50bd65a574776817a03dd32fd438cb2327447109
Parents: 8df0bba
Author: Stephan Ewen <sewen@apache.org>
Authored: Sun Feb 7 21:46:16 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 10 22:15:31 2016 +0100

----------------------------------------------------------------------
 .../examples/windowing/SessionWindowing.java    |   3 +-
 .../api/windowing/assigners/GlobalWindows.java  |  10 +-
 .../triggers/ContinuousEventTimeTrigger.java    |   7 +-
 .../ContinuousProcessingTimeTrigger.java        |   2 +-
 .../api/windowing/triggers/CountTrigger.java    |   2 +-
 .../api/windowing/triggers/DeltaTrigger.java    |   7 +-
 .../windowing/triggers/EventTimeTrigger.java    |   5 +-
 .../triggers/ProcessingTimeTrigger.java         |   5 +-
 .../api/windowing/triggers/PurgingTrigger.java  |   4 +-
 .../api/windowing/triggers/Trigger.java         | 102 +++++--------------
 .../api/windowing/triggers/TriggerResult.java   |  96 +++++++++++++++++
 .../windowing/EvictingWindowOperator.java       |   5 +-
 .../windowing/NonKeyedWindowOperator.java       |  34 ++++---
 .../operators/windowing/WindowOperator.java     |  18 ++--
 14 files changed, 179 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index bd82800..e2df160 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 
 import java.util.ArrayList;
@@ -95,7 +96,7 @@ public class SessionWindowing {
 		env.execute();
 	}
 
-	private static class SessionTrigger implements Trigger<Tuple3<String, Long, Integer>,
GlobalWindow> {
+	private static class SessionTrigger extends Trigger<Tuple3<String, Long, Integer>,
GlobalWindow> {
 
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index d3eb2ac..a4d92cf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 
 import java.util.Collection;
@@ -67,15 +68,12 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow>
{
 	/**
 	 * A trigger that never fires, as default Trigger for GlobalWindows.
 	 */
-	private static class NeverTrigger implements Trigger<Object, GlobalWindow> {
+	private static class NeverTrigger extends Trigger<Object, GlobalWindow> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public TriggerResult onElement(Object element,
-				long timestamp,
-				GlobalWindow window,
-				TriggerContext ctx) {
-				return TriggerResult.CONTINUE;
+		public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext
ctx) {
+			return TriggerResult.CONTINUE;
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 02a935c..09f3959 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -33,7 +34,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
  *
  * @param <W> The type of {@link Window Windows} on which this trigger can operate.
  */
-public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Object,
W> {
+public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object,
W> {
 	private static final long serialVersionUID = 1L;
 
 	private final long interval;

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 25d9508..ca7ecb6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
  *
  * @param <W> The type of {@link Window Windows} on which this trigger can operate.
  */
-public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object,
W> {
+public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object,
W> {
 	private static final long serialVersionUID = 1L;
 
 	private final long interval;

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index 725cbf6..5113991 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -30,7 +30,7 @@ import java.io.IOException;
  *
  * @param <W> The type of {@link Window Windows} on which this trigger can operate.
  */
-public class CountTrigger<W extends Window> implements Trigger<Object, W> {
+public class CountTrigger<W extends Window> extends Trigger<Object, W> {
 	private static final long serialVersionUID = 1L;
 
 	private final long maxCount;

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index 55c719a..4a6cde3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import org.apache.flink.api.common.state.ValueState;
@@ -33,7 +34,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
  *
  * @param <W> The type of {@link Window Windows} on which this trigger can operate.
  */
-public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
+public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {
 	private static final long serialVersionUID = 1L;
 
 	private final DeltaFunction<T> deltaFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
index bbd0a01..17d04c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -25,7 +26,7 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  *
  * @see org.apache.flink.streaming.api.watermark.Watermark
  */
-public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
+public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
 	private static final long serialVersionUID = 1L;
 
 	private EventTimeTrigger() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index 85d6749..ae0b0e5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -23,7 +24,7 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  * A {@link Trigger} that fires once the current system time passes the end of the window
  * to which a pane belongs.
  */
-public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
+public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
 	private static final long serialVersionUID = 1L;
 
 	private ProcessingTimeTrigger() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index 626906c..0ec236b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -25,12 +25,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
  *
  * <p>
  * When the nested trigger fires, this will return a {@code FIRE_AND_PURGE}
- * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerResult}
+ * {@link TriggerResult}
  *
  * @param <T> The type of elements on which this trigger can operate.
  * @param <W> The type of {@link Window Windows} on which this trigger can operate.
  */
-public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
+public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
 	private static final long serialVersionUID = 1L;
 
 	private Trigger<T, W> nestedTrigger;

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index fb61064..a200e5c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import org.apache.flink.api.common.state.State;
@@ -39,12 +40,14 @@ import java.io.Serializable;
  * <p>
  * Triggers must not maintain state internally since they can be re-created or reused for
  * different keys. All necessary state should be persisted using the state abstraction
- * available on the {@link org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext}.
+ * available on the {@link TriggerContext}.
  *
  * @param <T> The type of elements on which this {@code Trigger} works.
  * @param <W> The type of {@link Window Windows} on which this {@code Trigger} can
operate.
  */
-public interface Trigger<T, W extends Window> extends Serializable {
+public abstract class Trigger<T, W extends Window> implements Serializable {
+	
+	private static final long serialVersionUID = -4104633972991191369L;
 
 	/**
 	 * Called for every element that gets added to a pane. The result of this will determine
@@ -55,7 +58,7 @@ public interface Trigger<T, W extends Window> extends Serializable
{
 	 * @param window The window to which this pane belongs.
 	 * @param ctx A context object that can be used to register timer callbacks.
 	 */
-	TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws
Exception;
+	public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext
ctx) throws Exception;
 
 	/**
 	 * Called when a processing-time timer that was set using the trigger context fires.
@@ -63,7 +66,7 @@ public interface Trigger<T, W extends Window> extends Serializable
{
 	 * @param time The timestamp at which the timer fired.
 	 * @param ctx A context object that can be used to register timer callbacks.
 	 */
-	TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
+	public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
throws Exception;
 
 	/**
 	 * Called when an event-time timer that was set using the trigger context fires.
@@ -71,102 +74,53 @@ public interface Trigger<T, W extends Window> extends Serializable
{
 	 * @param time The timestamp at which the timer fired.
 	 * @param ctx A context object that can be used to register timer callbacks.
 	 */
-	TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
+	public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws
Exception;
 
 	/**
 	 * Clears any state that the trigger might still hold for the given window. This is called
 	 * when a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)}
 	 * and {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as
 	 * well as state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
+	 * 
+	 * <p>By default, this method does nothing.
 	 */
-	void clear(W window, TriggerContext ctx) throws Exception;
+	public void clear(W window, TriggerContext ctx) throws Exception {}
 
+	// ------------------------------------------------------------------------
+	
 	/**
-	 * Result type for trigger methods. This determines what happens with the window.
-	 *
-	 * <p>
-	 * On {@code FIRE} the pane is evaluated and results are emitted. The contents of the window
-	 * are kept. {@code FIRE_AND_PURGE} acts like {@code FIRE} but the contents of the pane
-	 * are purged. On {@code CONTINUE} nothing happens, processing continues. On {@code PURGE}
-	 * the contents of the window are discarded and no result is emitted for the window.
-	 */
-	enum TriggerResult {
-		CONTINUE(false, false), FIRE_AND_PURGE(true, true), FIRE(true, false), PURGE(false, true);
-
-		private final boolean fire;
-		private final boolean purge;
-
-		TriggerResult(boolean fire, boolean purge) {
-			this.purge = purge;
-			this.fire = fire;
-		}
-
-		public boolean isFire() {
-			return fire;
-		}
-
-		public boolean isPurge() {
-			return purge;
-		}
-
-		/**
-		 * Merges two {@code TriggerResults}. This specifies what should happen if we have
-		 * two results from a Trigger, for example as a result from
-		 * {@link #onElement(Object, long, Window, TriggerContext)} and
-		 * {@link #onEventTime(long, Window, TriggerContext)}.
-		 *
-		 * <p>
-		 * For example, if one result says {@code CONTINUE} while the other says {@code FIRE}
-		 * then {@code FIRE} is the combined result;
-		 */
-		public static TriggerResult merge(TriggerResult a, TriggerResult b) {
-			if (a.purge || b.purge) {
-				if (a.fire || b.fire) {
-					return FIRE_AND_PURGE;
-				} else {
-					return PURGE;
-				}
-			} else if (a.fire || b.fire) {
-				return FIRE;
-			} else {
-				return CONTINUE;
-			}
-		}
-	}
-
-	/**
-	 * A context object that is given to {@code Trigger} methods to allow them to register timer
+	 * A context object that is given to {@link Trigger} methods to allow them to register timer
 	 * callbacks and deal with state.
 	 */
-	interface TriggerContext {
-
+	public interface TriggerContext {
+	
 		/**
 		 * Register a system time callback. When the current system time passes the specified
-		 * time {@link #onProcessingTime(long, Window, TriggerContext)} is called with the time
specified here.
+		 * time {@link Trigger#onProcessingTime(long, Window, TriggerContext)} is called with the
time specified here.
 		 *
-		 * @param time The time at which to invoke {@link #onProcessingTime(long, Window, TriggerContext)}
+		 * @param time The time at which to invoke {@link Trigger#onProcessingTime(long, Window,
TriggerContext)}
 		 */
 		void registerProcessingTimeTimer(long time);
-
+	
 		/**
 		 * Register an event-time callback. When the current watermark passes the specified
-		 * time {@link #onEventTime(long, Window, TriggerContext)} is called with the time specified
here.
+		 * time {@link Trigger#onEventTime(long, Window, TriggerContext)} is called with the time
specified here.
 		 *
-		 * @param time The watermark at which to invoke {@link #onEventTime(long, Window, TriggerContext)}
+		 * @param time The watermark at which to invoke {@link Trigger#onEventTime(long, Window,
TriggerContext)}
 		 * @see org.apache.flink.streaming.api.watermark.Watermark
 		 */
 		void registerEventTimeTimer(long time);
-
+	
 		/**
 		 * Delete the processing time trigger for the given time.
 		 */
 		void deleteProcessingTimeTimer(long time);
-
+	
 		/**
 		 * Delete the event-time trigger for the given time.
 		 */
 		void deleteEventTimeTimer(long time);
-
+	
 		/**
 		 * Retrieves an {@link State} object that can be used to interact with
 		 * fault-tolerant state that is scoped to the window and key of the current
@@ -180,7 +134,7 @@ public interface Trigger<T, W extends Window> extends Serializable
{
 		 *                                       function (function is not part os a KeyedStream).
 		 */
 		<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
-
+	
 		/**
 		 * Retrieves a {@link ValueState} object that can be used to interact with
 		 * fault-tolerant state that is scoped to the window and key of the current
@@ -199,8 +153,8 @@ public interface Trigger<T, W extends Window> extends Serializable
{
 		 */
 		@Deprecated
 		<S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S>
stateType, S defaultState);
-
-
+	
+	
 		/**
 		 * Retrieves a {@link ValueState} object that can be used to interact with
 		 * fault-tolerant state that is scoped to the window and key of the current

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java
new file mode 100644
index 0000000..2841542
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Result type for trigger methods. This determines what happens with the window,
+ * for example whether the window function should be called, or the window
+ * should be discarded.
+ */
+public enum TriggerResult {
+
+	/**
+	 * No action is taken on the window.
+	 */
+	CONTINUE(false, false),
+
+	/**
+	 * {@code FIRE_AND_PURGE} evaluates the window function and emits the window
+	 * result. 
+	 */
+	FIRE_AND_PURGE(true, true),
+
+	/**
+	 * On {@code FIRE}, the window is evaluated and results are emitted.
+	 * The window is not purged, though, all elements are retained.
+	 */
+	FIRE(true, false),
+
+	/**
+	 * All elements in the window are cleared and the window is discarded,
+	 * without evaluating the window function or emitting any elements.
+	 */
+	PURGE(false, true);
+
+	// ------------------------------------------------------------------------
+	
+	private final boolean fire;
+	private final boolean purge;
+
+	TriggerResult(boolean fire, boolean purge) {
+		this.purge = purge;
+		this.fire = fire;
+	}
+
+	public boolean isFire() {
+		return fire;
+	}
+
+	public boolean isPurge() {
+		return purge;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Merges two {@code TriggerResults}. This specifies what should happen if we have
+	 * two results from a Trigger, for example as a result from
+	 * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)} and
+	 * {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)}.
+	 *
+	 * <p>
+	 * For example, if one result says {@code CONTINUE} while the other says {@code FIRE}
+	 * then {@code FIRE} is the combined result;
+	 */
+	public static TriggerResult merge(TriggerResult a, TriggerResult b) {
+		if (a.purge || b.purge) {
+			if (a.fire || b.fire) {
+				return FIRE_AND_PURGE;
+			} else {
+				return PURGE;
+			}
+		} else if (a.fire || b.fire) {
+			return FIRE;
+		} else {
+			return CONTINUE;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 41ec91a..a960ac4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -87,7 +88,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
extends Window
 
 			context.key = key;
 			context.window = window;
-			Trigger.TriggerResult triggerResult = context.onElement(element);
+			TriggerResult triggerResult = context.onElement(element);
 
 			processTriggerResult(triggerResult, key, window);
 		}
@@ -95,7 +96,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
extends Window
 
 	@Override
 	@SuppressWarnings("unchecked,rawtypes")
-	protected void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window)
throws Exception {
+	protected void processTriggerResult(TriggerResult triggerResult, K key, W window) throws
Exception {
 		if (!triggerResult.isFire() && !triggerResult.isPurge()) {
 			// do nothing
 			return;

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index d7dbaf5..93761e6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -38,6 +38,8 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
@@ -248,7 +250,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 				windows.put(window, context);
 			}
 			context.windowBuffer.storeElement(element);
-			Trigger.TriggerResult triggerResult = context.onElement(element);
+			TriggerResult triggerResult = context.onElement(element);
 			processTriggerResult(triggerResult, window);
 		}
 	}
@@ -264,7 +266,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		}
 	}
 
-	private void processTriggerResult(Trigger.TriggerResult triggerResult, W window) throws
Exception {
+	private void processTriggerResult(TriggerResult triggerResult, W window) throws Exception
{
 		if (!triggerResult.isFire() && !triggerResult.isPurge()) {
 			// do nothing
 			return;
@@ -311,7 +313,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 				// We have to check here whether the entry in the set still reflects the
 				// currently set timer in the Context.
 				if (ctx.watermarkTimer <= mark.getTimestamp()) {
-					Trigger.TriggerResult triggerResult = ctx.onEventTime(ctx.watermarkTimer);
+					TriggerResult triggerResult = ctx.onEventTime(ctx.watermarkTimer);
 					processTriggerResult(triggerResult, ctx.window);
 				}
 			}
@@ -343,7 +345,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 				// performance reasons. We have to check here whether the entry in the set still
 				// reflects the currently set timer in the Context.
 				if (ctx.processingTimeTimer <= time) {
-					Trigger.TriggerResult triggerResult = ctx.onProcessingTime(ctx.processingTimeTimer);
+					TriggerResult triggerResult = ctx.onProcessingTime(ctx.processingTimeTimer);
 					processTriggerResult(triggerResult, ctx.window);
 				}
 			}
@@ -360,7 +362,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes
all
 	 * have their own instance of the {@code Trigger}.
 	 */
-	protected class Context implements Trigger.TriggerContext {
+	protected class Context implements TriggerContext {
 		protected W window;
 
 		protected WindowBuffer<IN> windowBuffer;
@@ -538,41 +540,41 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 		}
 
-		public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception
{
-			Trigger.TriggerResult onElementResult = trigger.onElement(element.getValue(), element.getTimestamp(),
window, this);
+		public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
+			TriggerResult onElementResult = trigger.onElement(element.getValue(), element.getTimestamp(),
window, this);
 			if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) {
 				// fire now and don't wait for the next watermark update
-				Trigger.TriggerResult onEventTimeResult = onEventTime(watermarkTimer);
-				return Trigger.TriggerResult.merge(onElementResult, onEventTimeResult);
+				TriggerResult onEventTimeResult = onEventTime(watermarkTimer);
+				return TriggerResult.merge(onElementResult, onEventTimeResult);
 			} else {
 				return onElementResult;
 			}
 		}
 
-		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
+		public TriggerResult onProcessingTime(long time) throws Exception {
 			if (time == processingTimeTimer) {
 				processingTimeTimer = -1;
 				return trigger.onProcessingTime(time, window, this);
 			} else {
-				return Trigger.TriggerResult.CONTINUE;
+				return TriggerResult.CONTINUE;
 			}
 		}
 
-		public Trigger.TriggerResult onEventTime(long time) throws Exception {
+		public TriggerResult onEventTime(long time) throws Exception {
 			if (time == watermarkTimer) {
 				watermarkTimer = -1;
-				Trigger.TriggerResult firstTriggerResult = trigger.onEventTime(time, window, this);
+				TriggerResult firstTriggerResult = trigger.onEventTime(time, window, this);
 
 				if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) {
 					// fire now and don't wait for the next watermark update
-					Trigger.TriggerResult secondTriggerResult = onEventTime(watermarkTimer);
-					return Trigger.TriggerResult.merge(firstTriggerResult, secondTriggerResult);
+					TriggerResult secondTriggerResult = onEventTime(watermarkTimer);
+					return TriggerResult.merge(firstTriggerResult, secondTriggerResult);
 				} else {
 					return firstTriggerResult;
 				}
 
 			} else {
-				return Trigger.TriggerResult.CONTINUE;
+				return TriggerResult.CONTINUE;
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index eccaeee..5fc89e8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -40,6 +40,8 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
@@ -243,13 +245,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 			context.key = key;
 			context.window = window;
-			Trigger.TriggerResult triggerResult = context.onElement(element);
+			TriggerResult triggerResult = context.onElement(element);
 
 			processTriggerResult(triggerResult, key, window);
 		}
 	}
 
-	protected void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window)
throws Exception {
+	protected void processTriggerResult(TriggerResult triggerResult, K key, W window) throws
Exception {
 		if (!triggerResult.isFire() && !triggerResult.isPurge()) {
 			// do nothing
 			return;
@@ -293,7 +295,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				context.key = timer.key;
 				context.window = timer.window;
 				setKeyContext(timer.key);
-				Trigger.TriggerResult triggerResult = context.onEventTime(timer.timestamp);
+				TriggerResult triggerResult = context.onEventTime(timer.timestamp);
 				processTriggerResult(triggerResult, context.key, context.window);
 			} else {
 				fire = false;
@@ -320,7 +322,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				context.key = timer.key;
 				context.window = timer.window;
 				setKeyContext(timer.key);
-				Trigger.TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
+				TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
 				processTriggerResult(triggerResult, context.key, context.window);
 			} else {
 				fire = false;
@@ -338,7 +340,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 * by setting the {@code key} and {@code window} fields. No internal state must be kept
in
 	 * the {@code Context}
 	 */
-	protected class Context implements Trigger.TriggerContext {
+	protected class Context implements TriggerContext {
 		protected K key;
 		protected W window;
 
@@ -427,15 +429,15 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 		}
 
-		public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception
{
+		public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
 			return trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
 		}
 
-		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
+		public TriggerResult onProcessingTime(long time) throws Exception {
 			return trigger.onProcessingTime(time, window, this);
 		}
 
-		public Trigger.TriggerResult onEventTime(long time) throws Exception {
+		public TriggerResult onEventTime(long time) throws Exception {
 			return trigger.onEventTime(time, window, this);
 		}
 


Mime
View raw message