flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [3/4] flink git commit: [FLINK-2864] Make State of General-Purpose Window Operators Fault-Tolerant
Date Tue, 20 Oct 2015 16:42:22 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index aecfd5d..7ab33cf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -18,12 +18,15 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
-
+import org.apache.commons.lang.SerializationUtils;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -37,11 +40,16 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -65,26 +73,70 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(NonKeyedWindowOperator.class);
+	private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
 
+	// ------------------------------------------------------------------------
+	// Configuration values and stuff from the user
+	// ------------------------------------------------------------------------
 
 	private final WindowAssigner<? super IN, W> windowAssigner;
 
-	private final Trigger<? super IN, ? super W> triggerTemplate;
+	private final Trigger<? super IN, ? super W> trigger;
+
 	private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
 
-	protected transient Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> windows;
+	/**
+	 * If this is true. The current processing time is set as the timestamp of incoming elements.
+	 * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
+	 * if eviction should happen based on processing time.
+	 */
+	private boolean setProcessingTime = false;
 
-	private transient Map<Long, Set<TriggerContext>> processingTimeTimers;
-	private transient Map<Long, Set<TriggerContext>> watermarkTimers;
+	/**
+	 * This is used to copy the incoming element because it can be put into several window
+	 * buffers.
+	 */
+	private TypeSerializer<IN> inputSerializer;
 
+	/**
+	 * For serializing the window in checkpoints.
+	 */
+	private final TypeSerializer<W> windowSerializer;
+
+	// ------------------------------------------------------------------------
+	// State that is not checkpointed
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Processing time timers that are currently in-flight.
+	 */
+	private transient Map<Long, Set<Context>> processingTimeTimers;
+
+	/**
+	 * Current waiting watermark callbacks.
+	 */
+	private transient Map<Long, Set<Context>> watermarkTimers;
+
+	/**
+	 * This is given to the {@code WindowFunction} for emitting elements with a given timestamp.
+	 */
 	protected transient TimestampedCollector<OUT> timestampedCollector;
 
-	private boolean setProcessingTime = false;
+	// ------------------------------------------------------------------------
+	// State that needs to be checkpointed
+	// ------------------------------------------------------------------------
 
-	private TypeSerializer<IN> inputSerializer;
+	/**
+	 * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
+	 * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
+	 */
+	protected transient Map<W, Context> windows;
 
+	/**
+	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
+	 */
 	public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+			TypeSerializer<W> windowSerializer,
 			WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
 			AllWindowFunction<IN, OUT, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger) {
@@ -92,25 +144,23 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		super(windowFunction);
 
 		this.windowAssigner = requireNonNull(windowAssigner);
+		this.windowSerializer = windowSerializer;
 
 		this.windowBufferFactory = requireNonNull(windowBufferFactory);
-		this.triggerTemplate = requireNonNull(trigger);
+		this.trigger = requireNonNull(trigger);
 
 		setChainingStrategy(ChainingStrategy.ALWAYS);
 	}
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
+	public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
 		inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
 	}
 
 	@Override
-	public void open() throws Exception {
+	public final void open() throws Exception {
 		super.open();
-		windows = new HashMap<>();
-		watermarkTimers = new HashMap<>();
-		processingTimeTimers = new HashMap<>();
 		timestampedCollector = new TimestampedCollector<>(output);
 
 		if (inputSerializer == null) {
@@ -119,14 +169,47 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 		windowBufferFactory.setRuntimeContext(getRuntimeContext());
 		windowBufferFactory.open(getUserFunctionParameters());
+
+		// these could already be initialized from restoreState()
+		if (watermarkTimers == null) {
+			watermarkTimers = new HashMap<>();
+		}
+		if (processingTimeTimers == null) {
+			processingTimeTimers = new HashMap<>();
+		}
+		if (windows == null) {
+			windows = new HashMap<>();
+		}
+
+		// re-register timers that this window context had set
+		for (Context context: windows.values()) {
+			if (context.processingTimeTimer > 0) {
+				Set<Context> triggers = processingTimeTimers.get(context.processingTimeTimer);
+				if (triggers == null) {
+					getRuntimeContext().registerTimer(context.processingTimeTimer, NonKeyedWindowOperator.this);
+					triggers = new HashSet<>();
+					processingTimeTimers.put(context.processingTimeTimer, triggers);
+				}
+				triggers.add(context);
+			}
+			if (context.watermarkTimer > 0) {
+				Set<Context> triggers = watermarkTimers.get(context.watermarkTimer);
+				if (triggers == null) {
+					triggers = new HashSet<>();
+					watermarkTimers.put(context.watermarkTimer, triggers);
+				}
+				triggers.add(context);
+			}
+
+		}
 	}
 
 	@Override
-	public void close() throws Exception {
+	public final void close() throws Exception {
 		super.close();
 		// emit the elements that we still keep
-		for (W window: windows.keySet()) {
-			emitWindow(window, false);
+		for (Context window: windows.values()) {
+			emitWindow(window);
 		}
 		windows.clear();
 		windowBufferFactory.close();
@@ -134,58 +217,60 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public void processElement(StreamRecord<IN> element) throws Exception {
+	public final void processElement(StreamRecord<IN> element) throws Exception {
 		if (setProcessingTime) {
 			element.replace(element.getValue(), System.currentTimeMillis());
 		}
+
 		Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
 
 		for (W window: elementWindows) {
-			Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger = windows.get(window);
-			if (bufferAndTrigger == null) {
-				bufferAndTrigger = new Tuple2<>();
-				bufferAndTrigger.f0 = windowBufferFactory.create();
-				bufferAndTrigger.f1 = new TriggerContext(window, triggerTemplate.duplicate());
-				windows.put(window, bufferAndTrigger);
+			Context context = windows.get(window);
+			if (context == null) {
+				WindowBuffer<IN> windowBuffer = windowBufferFactory.create();
+				context = new Context(window, windowBuffer);
+				windows.put(window, context);
 			}
 			StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
-			bufferAndTrigger.f0.storeElement(elementCopy);
-			Trigger.TriggerResult triggerResult = bufferAndTrigger.f1.trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, bufferAndTrigger.f1);
+			context.windowBuffer.storeElement(elementCopy);
+			Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context);
 			processTriggerResult(triggerResult, window);
 		}
 	}
 
-	protected void emitWindow(W window, boolean purge) throws Exception {
-		timestampedCollector.setTimestamp(window.getEnd());
-
-		Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
-		if (purge) {
-			bufferAndTrigger = windows.remove(window);
-		} else {
-			bufferAndTrigger = windows.get(window);
-		}
-
-		if (bufferAndTrigger == null) {
-			LOG.debug("Window {} already gone.", window);
-			return;
-		}
-
+	protected void emitWindow(Context context) throws Exception {
+		timestampedCollector.setTimestamp(context.window.maxTimestamp());
 
 		userFunction.apply(
-				window,
-				bufferAndTrigger.f0.getUnpackedElements(),
+				context.window,
+				context.windowBuffer.getUnpackedElements(),
 				timestampedCollector);
 	}
 
 	private void processTriggerResult(Trigger.TriggerResult triggerResult, W window) throws Exception {
 		switch (triggerResult) {
-			case FIRE:
-				emitWindow(window, false);
+			case FIRE: {
+				Context context = windows.get(window);
+				if (context == null) {
+					LOG.debug("Window {} already gone.", window);
+					return;
+				}
+
+
+				emitWindow(context);
 				break;
+			}
 
-			case FIRE_AND_PURGE:
-				emitWindow(window, true);
+			case FIRE_AND_PURGE: {
+				Context context = windows.remove(window);
+				if (context == null) {
+					LOG.debug("Window {} already gone.", window);
+					return;
+				}
+
+				emitWindow(context);
 				break;
+			}
 
 			case CONTINUE:
 				// ingore
@@ -193,14 +278,14 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void processWatermark(Watermark mark) throws Exception {
+	public final void processWatermark(Watermark mark) throws Exception {
 		Set<Long> toRemove = new HashSet<>();
 
-		for (Map.Entry<Long, Set<TriggerContext>> triggers: watermarkTimers.entrySet()) {
+		for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
 			if (triggers.getKey() <= mark.getTimestamp()) {
-				for (TriggerContext trigger: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = trigger.trigger.onTime(mark.getTimestamp(), trigger);
-					processTriggerResult(triggerResult, trigger.window);
+				for (Context context: triggers.getValue()) {
+					Trigger.TriggerResult triggerResult = context.onEventTime(triggers.getKey());
+					processTriggerResult(triggerResult, context.window);
 				}
 				toRemove.add(triggers.getKey());
 			}
@@ -213,14 +298,14 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void trigger(long time) throws Exception {
+	public final void trigger(long time) throws Exception {
 		Set<Long> toRemove = new HashSet<>();
 
-		for (Map.Entry<Long, Set<TriggerContext>> triggers: processingTimeTimers.entrySet()) {
+		for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
 			if (triggers.getKey() < time) {
-				for (TriggerContext trigger: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = trigger.trigger.onTime(time, trigger);
-					processTriggerResult(triggerResult, trigger.window);
+				for (Context context: triggers.getValue()) {
+					Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
+					processTriggerResult(triggerResult, context.window);
 				}
 				toRemove.add(triggers.getKey());
 			}
@@ -231,35 +316,139 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		}
 	}
 
-	protected class TriggerContext implements Trigger.TriggerContext {
-		Trigger<? super IN, ? super W> trigger;
-		W window;
+	/**
+	 * A context object that is given to {@code Trigger} functions to allow them to register
+	 * timer/watermark callbacks.
+	 */
+	protected class Context implements Trigger.TriggerContext {
+		protected W window;
+
+		protected WindowBuffer<IN> windowBuffer;
+
+		protected HashMap<String, Serializable> state;
+
+		// use these to only allow one timer in flight at a time of each type
+		// if the trigger registers another timer this value here will be overwritten,
+		// the timer is not removed from the set of in-flight timers to improve performance.
+		// When a trigger fires it is just checked against the last timer that was set.
+		protected long watermarkTimer;
+		protected long processingTimeTimer;
 
-		public TriggerContext(W window, Trigger<? super IN, ? super W> trigger) {
+		public Context(
+				W window,
+				WindowBuffer<IN> windowBuffer) {
 			this.window = window;
-			this.trigger = trigger;
+			this.windowBuffer = windowBuffer;
+			state = new HashMap<>();
+
+			this.watermarkTimer = -1;
+			this.processingTimeTimer = -1;
+		}
+
+
+		@SuppressWarnings("unchecked")
+		protected Context(DataInputView in) throws Exception {
+			this.window = windowSerializer.deserialize(in);
+			this.watermarkTimer = in.readLong();
+			this.processingTimeTimer = in.readLong();
+
+			int stateSize = in.readInt();
+			byte[] stateData = new byte[stateSize];
+			in.read(stateData);
+			ByteArrayInputStream bais = new ByteArrayInputStream(stateData);
+			state = (HashMap<String, Serializable>) SerializationUtils.deserialize(bais);
+
+			this.windowBuffer = windowBufferFactory.create();
+			int numElements = in.readInt();
+			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
+			for (int i = 0; i < numElements; i++) {
+				windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord());
+			}
+		}
+
+		protected void writeToState(StateBackend.CheckpointStateOutputView out) throws IOException {
+			windowSerializer.serialize(window, out);
+			out.writeLong(watermarkTimer);
+			out.writeLong(processingTimeTimer);
+
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			SerializationUtils.serialize(state, baos);
+			out.writeInt(baos.size());
+			out.write(baos.toByteArray(), 0, baos.size());
+
+			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
+			out.writeInt(windowBuffer.size());
+			for (StreamRecord<IN> element: windowBuffer.getElements()) {
+				recordSerializer.serialize(element, out);
+			}
+		}
+
+		@SuppressWarnings("unchecked")
+		public <S extends Serializable> OperatorState<S> getKeyValueState(final String name, final S defaultState) {
+			return new OperatorState<S>() {
+				@Override
+				public S value() throws IOException {
+					Serializable value = state.get(name);
+					if (value == null) {
+						state.put(name, defaultState);
+						value = defaultState;
+					}
+					return (S) value;
+				}
+
+				@Override
+				public void update(S value) throws IOException {
+					state.put(name, value);
+				}
+			};
 		}
 
 		@Override
 		public void registerProcessingTimeTimer(long time) {
-			Set<TriggerContext> triggers = processingTimeTimers.get(time);
+			if (this.processingTimeTimer == time) {
+				// we already have set a trigger for that time
+				return;
+			}
+			Set<Context> triggers = processingTimeTimers.get(time);
 			if (triggers == null) {
 				getRuntimeContext().registerTimer(time, NonKeyedWindowOperator.this);
 				triggers = new HashSet<>();
 				processingTimeTimers.put(time, triggers);
 			}
+			this.processingTimeTimer = time;
 			triggers.add(this);
 		}
 
 		@Override
 		public void registerWatermarkTimer(long time) {
-			Set<TriggerContext> triggers = watermarkTimers.get(time);
+			if (watermarkTimer == time) {
+				// we already have set a trigger for that time
+				return;
+			}
+			Set<Context> triggers = watermarkTimers.get(time);
 			if (triggers == null) {
 				triggers = new HashSet<>();
 				watermarkTimers.put(time, triggers);
 			}
+			this.watermarkTimer = time;
 			triggers.add(this);
 		}
+
+		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
+			if (time == processingTimeTimer) {
+				return trigger.onTime(time, this);
+			} else {
+				return Trigger.TriggerResult.CONTINUE;
+			}
+		}
+
+		public Trigger.TriggerResult onEventTime(long time) throws Exception {
+			if (time == watermarkTimer) {
+				return trigger.onTime(time, this);
+			} else {
+				return Trigger.TriggerResult.CONTINUE;
+			}
+		}
 	}
 
 	/**
@@ -274,7 +463,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+	public final void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
 		if (userFunction instanceof OutputTypeConfigurable) {
 			@SuppressWarnings("unchecked")
 			OutputTypeConfigurable<OUT> typeConfigurable = (OutputTypeConfigurable<OUT>) userFunction;
@@ -283,12 +472,59 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	// ------------------------------------------------------------------------
+	//  Checkpointing
+	// ------------------------------------------------------------------------
+
+	@Override
+	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+		// we write the panes with the key/value maps into the stream
+		StateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+
+		int numWindows = windows.size();
+		out.writeInt(numWindows);
+		for (Context context: windows.values()) {
+			context.writeToState(out);
+		}
+
+		taskState.setOperatorState(out.closeAndGetHandle());
+		return taskState;
+	}
+
+	@Override
+	public void restoreState(StreamTaskState taskState) throws Exception {
+		super.restoreState(taskState);
+
+
+		@SuppressWarnings("unchecked")
+		StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
+		DataInputView in = inputState.getState(getUserCodeClassloader());
+
+		int numWindows = in.readInt();
+		this.windows = new HashMap<>(numWindows);
+		this.processingTimeTimers = new HashMap<>();
+		this.watermarkTimers = new HashMap<>();
+
+		for (int j = 0; j < numWindows; j++) {
+			Context context = new Context(in);
+			windows.put(context.window, context);
+		}
+	}
+
+
+	// ------------------------------------------------------------------------
 	// Getters for testing
 	// ------------------------------------------------------------------------
 
 	@VisibleForTesting
-	public Trigger<? super IN, ? super W> getTriggerTemplate() {
-		return triggerTemplate;
+	public boolean isSetProcessingTime() {
+		return setProcessingTime;
+	}
+
+	@VisibleForTesting
+	public Trigger<? super IN, ? super W> getTrigger() {
+		return trigger;
 	}
 
 	@VisibleForTesting
@@ -300,9 +536,4 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() {
 		return windowBufferFactory;
 	}
-
-	@VisibleForTesting
-	public boolean isSetProcessingTime() {
-		return setProcessingTime;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 82a3f9a..0b3274f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -18,13 +18,16 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
-
+import org.apache.commons.lang.SerializationUtils;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -38,10 +41,16 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -84,49 +93,77 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 	private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
 
+	// ------------------------------------------------------------------------
+	// Configuration values and user functions
+	// ------------------------------------------------------------------------
+
 	private final WindowAssigner<? super IN, W> windowAssigner;
 
 	private final KeySelector<IN, K> keySelector;
 
-	private final Trigger<? super IN, ? super W> triggerTemplate;
+	private final Trigger<? super IN, ? super W> trigger;
 
 	private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
 
 	/**
-	 * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
-	 * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
+	 * If this is true. The current processing time is set as the timestamp of incoming elements.
+	 * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
+	 * if eviction should happen based on processing time.
+	 */
+	private boolean setProcessingTime = false;
+
+	/**
+	 * This is used to copy the incoming element because it can be put into several window
+	 * buffers.
+	 */
+	private TypeSerializer<IN> inputSerializer;
+
+	/**
+	 * For serializing the key in checkpoints.
 	 */
-	protected transient Map<K, Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>>> windows;
+	private final TypeSerializer<K> keySerializer;
+
+	/**
+	 * For serializing the window in checkpoints.
+	 */
+	private final TypeSerializer<W> windowSerializer;
+
+	// ------------------------------------------------------------------------
+	// State that is not checkpointed
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Processing time timers that are currently in-flight.
 	 */
-	private transient Map<Long, Set<TriggerContext>> processingTimeTimers;
+	private transient Map<Long, Set<Context>> processingTimeTimers;
 
 	/**
 	 * Current waiting watermark callbacks.
 	 */
-	private transient Map<Long, Set<TriggerContext>> watermarkTimers;
+	private transient Map<Long, Set<Context>> watermarkTimers;
 
 	/**
 	 * This is given to the {@code WindowFunction} for emitting elements with a given timestamp.
 	 */
 	protected transient TimestampedCollector<OUT> timestampedCollector;
 
+	// ------------------------------------------------------------------------
+	// State that needs to be checkpointed
+	// ------------------------------------------------------------------------
+
 	/**
-	 * If this is true. The current processing time is set as the timestamp of incoming elements.
-	 * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
-	 * if eviction should happen based on processing time.
+	 * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
+	 * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
 	 */
-	private boolean setProcessingTime = false;
-
-	private TypeSerializer<IN> inputSerializer;
+	protected transient Map<K, Map<W, Context>> windows;
 
 	/**
 	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
 	 */
 	public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+			TypeSerializer<W> windowSerializer,
 			KeySelector<IN, K> keySelector,
+			TypeSerializer<K> keySerializer,
 			WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
 			WindowFunction<IN, OUT, K, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger) {
@@ -134,27 +171,26 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 		super(windowFunction);
 
 		this.windowAssigner = requireNonNull(windowAssigner);
+		this.windowSerializer = windowSerializer;
 		this.keySelector = requireNonNull(keySelector);
+		this.keySerializer = requireNonNull(keySerializer);
 
 		this.windowBufferFactory = requireNonNull(windowBufferFactory);
-		this.triggerTemplate = requireNonNull(trigger);
+		this.trigger = requireNonNull(trigger);
 
 		setChainingStrategy(ChainingStrategy.ALWAYS);
-//		forceInputCopy();
 	}
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
+	public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
 		inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
 	}
 
 	@Override
-	public void open() throws Exception {
+	public final void open() throws Exception {
 		super.open();
-		windows = new HashMap<>();
-		watermarkTimers = new HashMap<>();
-		processingTimeTimers = new HashMap<>();
+
 		timestampedCollector = new TimestampedCollector<>(output);
 
 		if (inputSerializer == null) {
@@ -163,17 +199,53 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 		windowBufferFactory.setRuntimeContext(getRuntimeContext());
 		windowBufferFactory.open(getUserFunctionParameters());
+
+
+		// these could already be initialized from restoreState()
+		if (watermarkTimers == null) {
+			watermarkTimers = new HashMap<>();
+		}
+		if (processingTimeTimers == null) {
+			processingTimeTimers = new HashMap<>();
+		}
+		if (windows == null) {
+			windows = new HashMap<>();
+		}
+
+		// re-register timers that this window context had set
+		for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) {
+			Map<W, Context> keyWindows = entry.getValue();
+			for (Context context: keyWindows.values()) {
+				if (context.processingTimeTimer > 0) {
+					Set<Context> triggers = processingTimeTimers.get(context.processingTimeTimer);
+					if (triggers == null) {
+						getRuntimeContext().registerTimer(context.processingTimeTimer, WindowOperator.this);
+						triggers = new HashSet<>();
+						processingTimeTimers.put(context.processingTimeTimer, triggers);
+					}
+					triggers.add(context);
+				}
+				if (context.watermarkTimer > 0) {
+					Set<Context> triggers = watermarkTimers.get(context.watermarkTimer);
+					if (triggers == null) {
+						triggers = new HashSet<>();
+						watermarkTimers.put(context.watermarkTimer, triggers);
+					}
+					triggers.add(context);
+				}
+
+			}
+		}
 	}
 
 	@Override
-	public void close() throws Exception {
+	public final void close() throws Exception {
 		super.close();
 		// emit the elements that we still keep
-		for (Map.Entry<K, Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>>> entry: windows.entrySet()) {
-			K key = entry.getKey();
-			Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = entry.getValue();
-			for (W window: keyWindows.keySet()) {
-				emitWindow(key, window, false);
+		for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) {
+			Map<W, Context> keyWindows = entry.getValue();
+			for (Context window: keyWindows.values()) {
+				emitWindow(window);
 			}
 		}
 		windows.clear();
@@ -182,77 +254,81 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public void processElement(StreamRecord<IN> element) throws Exception {
+	public final void processElement(StreamRecord<IN> element) throws Exception {
 		if (setProcessingTime) {
 			element.replace(element.getValue(), System.currentTimeMillis());
 		}
+
 		Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
 
 		K key = keySelector.getKey(element.getValue());
 
-		Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key);
+		Map<W, Context> keyWindows = windows.get(key);
 		if (keyWindows == null) {
 			keyWindows = new HashMap<>();
 			windows.put(key, keyWindows);
 		}
 
 		for (W window: elementWindows) {
-			Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger = keyWindows.get(window);
-			if (bufferAndTrigger == null) {
-				bufferAndTrigger = new Tuple2<>();
-				bufferAndTrigger.f0 = windowBufferFactory.create();
-				bufferAndTrigger.f1 = new TriggerContext(key, window, triggerTemplate.duplicate());
-				keyWindows.put(window, bufferAndTrigger);
+			Context context = keyWindows.get(window);
+			if (context == null) {
+				WindowBuffer<IN> windowBuffer = windowBufferFactory.create();
+				context = new Context(key, window, windowBuffer);
+				keyWindows.put(window, context);
 			}
 			StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
-			bufferAndTrigger.f0.storeElement(elementCopy);
-			Trigger.TriggerResult triggerResult = bufferAndTrigger.f1.trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, bufferAndTrigger.f1);
+			context.windowBuffer.storeElement(elementCopy);
+			Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context);
 			processTriggerResult(triggerResult, key, window);
 		}
 	}
 
-	protected void emitWindow(K key, W window, boolean purge) throws Exception {
-		timestampedCollector.setTimestamp(window.getEnd());
-
-		Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key);
-
-		if (keyWindows == null) {
-			LOG.debug("Window {} for key {} already gone.", window, key);
-			return;
-		}
-
-		Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
-		if (purge) {
-			bufferAndTrigger = keyWindows.remove(window);
-		} else {
-			bufferAndTrigger = keyWindows.get(window);
-		}
-
-		if (bufferAndTrigger == null) {
-			LOG.debug("Window {} for key {} already gone.", window, key);
-			return;
-		}
+	protected void emitWindow(Context context) throws Exception {
+		timestampedCollector.setTimestamp(context.window.maxTimestamp());
 
-
-		userFunction.apply(key,
-				window,
-				bufferAndTrigger.f0.getUnpackedElements(),
+		userFunction.apply(context.key,
+				context.window,
+				context.windowBuffer.getUnpackedElements(),
 				timestampedCollector);
-
-		if (keyWindows.isEmpty()) {
-			windows.remove(key);
-		}
 	}
 
 	private void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception {
 		switch (triggerResult) {
-			case FIRE:
-				emitWindow(key, window, false);
+			case FIRE: {
+				Map<W, Context> keyWindows = windows.get(key);
+				if (keyWindows == null) {
+					LOG.debug("Window {} for key {} already gone.", window, key);
+					return;
+				}
+				Context context = keyWindows.get(window);
+				if (context == null) {
+					LOG.debug("Window {} for key {} already gone.", window, key);
+					return;
+				}
+
+
+				emitWindow(context);
 				break;
+			}
 
-			case FIRE_AND_PURGE:
-				emitWindow(key, window, true);
+			case FIRE_AND_PURGE: {
+				Map<W, Context> keyWindows = windows.get(key);
+				if (keyWindows == null) {
+					LOG.debug("Window {} for key {} already gone.", window, key);
+					return;
+				}
+				Context context = keyWindows.remove(window);
+				if (context == null) {
+					LOG.debug("Window {} for key {} already gone.", window, key);
+					return;
+				}
+				if (keyWindows.isEmpty()) {
+					windows.remove(key);
+				}
+
+				emitWindow(context);
 				break;
+			}
 
 			case CONTINUE:
 				// ingore
@@ -260,14 +336,14 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void processWatermark(Watermark mark) throws Exception {
+	public final void processWatermark(Watermark mark) throws Exception {
 		Set<Long> toRemove = new HashSet<>();
 
-		for (Map.Entry<Long, Set<TriggerContext>> triggers: watermarkTimers.entrySet()) {
+		for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
 			if (triggers.getKey() <= mark.getTimestamp()) {
-				for (TriggerContext trigger: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = trigger.trigger.onTime(mark.getTimestamp(), trigger);
-					processTriggerResult(triggerResult, trigger.key, trigger.window);
+				for (Context context: triggers.getValue()) {
+					Trigger.TriggerResult triggerResult = context.onEventTime(triggers.getKey());
+					processTriggerResult(triggerResult, context.key, context.window);
 				}
 				toRemove.add(triggers.getKey());
 			}
@@ -280,14 +356,14 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void trigger(long time) throws Exception {
+	public final void trigger(long time) throws Exception {
 		Set<Long> toRemove = new HashSet<>();
 
-		for (Map.Entry<Long, Set<TriggerContext>> triggers: processingTimeTimers.entrySet()) {
+		for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
 			if (triggers.getKey() < time) {
-				for (TriggerContext trigger: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = trigger.trigger.onTime(time, trigger);
-					processTriggerResult(triggerResult, trigger.key, trigger.window);
+				for (Context context: triggers.getValue()) {
+					Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
+					processTriggerResult(triggerResult, context.key, context.window);
 				}
 				toRemove.add(triggers.getKey());
 			}
@@ -302,37 +378,146 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	 * A context object that is given to {@code Trigger} functions to allow them to register
 	 * timer/watermark callbacks.
 	 */
-	protected class TriggerContext implements Trigger.TriggerContext {
-		Trigger<? super IN, ? super W> trigger;
-		K key;
-		W window;
+	protected class Context implements Trigger.TriggerContext {
+		protected K key;
+		protected W window;
+
+		protected WindowBuffer<IN> windowBuffer;
 
-		public TriggerContext(K key, W window, Trigger<? super IN, ? super W> trigger) {
+		protected HashMap<String, Serializable> state;
+
+		// use these to only allow one timer in flight at a time of each type
+		// if the trigger registers another timer this value here will be overwritten,
+		// the timer is not removed from the set of in-flight timers to improve performance.
+		// When a trigger fires it is just checked against the last timer that was set.
+		protected long watermarkTimer;
+		protected long processingTimeTimer;
+
+		public Context(K key,
+				W window,
+				WindowBuffer<IN> windowBuffer) {
 			this.key = key;
 			this.window = window;
-			this.trigger = trigger;
+			this.windowBuffer = windowBuffer;
+			state = new HashMap<>();
+
+			this.watermarkTimer = -1;
+			this.processingTimeTimer = -1;
+		}
+
+		/**
+		 * Constructs a new {@code Context} by reading from a {@link DataInputView} that
+		 * contains a serialized context that we wrote in
+		 * {@link #writeToState(StateBackend.CheckpointStateOutputView)}
+		 */
+		@SuppressWarnings("unchecked")
+		protected Context(DataInputView in) throws Exception {
+			this.key = keySerializer.deserialize(in);
+			this.window = windowSerializer.deserialize(in);
+			this.watermarkTimer = in.readLong();
+			this.processingTimeTimer = in.readLong();
+
+			int stateSize = in.readInt();
+			byte[] stateData = new byte[stateSize];
+			in.read(stateData);
+			ByteArrayInputStream bais = new ByteArrayInputStream(stateData);
+			state = (HashMap<String, Serializable>) SerializationUtils.deserialize(bais);
+
+			this.windowBuffer = windowBufferFactory.create();
+			int numElements = in.readInt();
+			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
+			for (int i = 0; i < numElements; i++) {
+				windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord());
+			}
+		}
+
+		/**
+		 * Writes the {@code Context} to the given state checkpoint output.
+		 */
+		protected void writeToState(StateBackend.CheckpointStateOutputView out) throws IOException {
+			keySerializer.serialize(key, out);
+			windowSerializer.serialize(window, out);
+			out.writeLong(watermarkTimer);
+			out.writeLong(processingTimeTimer);
+
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			SerializationUtils.serialize(state, baos);
+			out.writeInt(baos.size());
+			out.write(baos.toByteArray(), 0, baos.size());
+
+			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
+			out.writeInt(windowBuffer.size());
+			for (StreamRecord<IN> element: windowBuffer.getElements()) {
+				recordSerializer.serialize(element, out);
+			}
+		}
+
+		@SuppressWarnings("unchecked")
+		public <S extends Serializable> OperatorState<S> getKeyValueState(final String name, final S defaultState) {
+			return new OperatorState<S>() {
+				@Override
+				public S value() throws IOException {
+					Serializable value = state.get(name);
+					if (value == null) {
+						state.put(name, defaultState);
+						value = defaultState;
+					}
+					return (S) value;
+				}
+
+				@Override
+				public void update(S value) throws IOException {
+					state.put(name, value);
+				}
+			};
 		}
 
 		@Override
 		public void registerProcessingTimeTimer(long time) {
-			Set<TriggerContext> triggers = processingTimeTimers.get(time);
+			if (this.processingTimeTimer == time) {
+				// we already have set a trigger for that time
+				return;
+			}
+			Set<Context> triggers = processingTimeTimers.get(time);
 			if (triggers == null) {
 				getRuntimeContext().registerTimer(time, WindowOperator.this);
 				triggers = new HashSet<>();
 				processingTimeTimers.put(time, triggers);
 			}
+			this.processingTimeTimer = time;
 			triggers.add(this);
 		}
 
 		@Override
 		public void registerWatermarkTimer(long time) {
-			Set<TriggerContext> triggers = watermarkTimers.get(time);
+			if (watermarkTimer == time) {
+				// we already have set a trigger for that time
+				return;
+			}
+			Set<Context> triggers = watermarkTimers.get(time);
 			if (triggers == null) {
 				triggers = new HashSet<>();
 				watermarkTimers.put(time, triggers);
 			}
+			this.watermarkTimer = time;
 			triggers.add(this);
 		}
+
+		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
+			if (time == processingTimeTimer) {
+				return trigger.onTime(time, this);
+			} else {
+				return Trigger.TriggerResult.CONTINUE;
+			}
+		}
+
+		public Trigger.TriggerResult onEventTime(long time) throws Exception {
+			if (time == watermarkTimer) {
+				return trigger.onTime(time, this);
+			} else {
+				return Trigger.TriggerResult.CONTINUE;
+			}
+		}
 	}
 
 	/**
@@ -347,7 +532,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+	public final void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
 		if (userFunction instanceof OutputTypeConfigurable) {
 			@SuppressWarnings("unchecked")
 			OutputTypeConfigurable<OUT> typeConfigurable = (OutputTypeConfigurable<OUT>) userFunction;
@@ -356,6 +541,60 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	// ------------------------------------------------------------------------
+	//  Checkpointing
+	// ------------------------------------------------------------------------
+
+	@Override
+	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+		// we write the panes with the key/value maps into the stream
+		StateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+
+		int numKeys = windows.size();
+		out.writeInt(numKeys);
+
+		for (Map.Entry<K, Map<W, Context>> keyWindows: windows.entrySet()) {
+			int numWindows = keyWindows.getValue().size();
+			out.writeInt(numWindows);
+			for (Context context: keyWindows.getValue().values()) {
+				context.writeToState(out);
+			}
+		}
+
+		taskState.setOperatorState(out.closeAndGetHandle());
+		return taskState;
+	}
+
+	@Override
+	public void restoreState(StreamTaskState taskState) throws Exception {
+		super.restoreState(taskState);
+
+
+		@SuppressWarnings("unchecked")
+		StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
+		DataInputView in = inputState.getState(getUserCodeClassloader());
+
+		int numKeys = in.readInt();
+		this.windows = new HashMap<>(numKeys);
+		this.processingTimeTimers = new HashMap<>();
+		this.watermarkTimers = new HashMap<>();
+
+		for (int i = 0; i < numKeys; i++) {
+			int numWindows = in.readInt();
+			for (int j = 0; j < numWindows; j++) {
+				Context context = new Context(in);
+				Map<W, Context> keyWindows = windows.get(context.key);
+				if (keyWindows == null) {
+					keyWindows = new HashMap<>(numWindows);
+					windows.put(context.key, keyWindows);
+				}
+				keyWindows.put(context.window, context);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
 	// Getters for testing
 	// ------------------------------------------------------------------------
 
@@ -365,8 +604,8 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	@VisibleForTesting
-	public Trigger<? super IN, ? super W> getTriggerTemplate() {
-		return triggerTemplate;
+	public Trigger<? super IN, ? super W> getTrigger() {
+		return trigger;
 	}
 
 	@VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 4fa16ac..45ef29f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -71,7 +71,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
 		Assert.assertFalse(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
 
@@ -94,7 +94,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
 		Assert.assertFalse(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator2.getTrigger() instanceof WatermarkTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 	}
@@ -119,7 +119,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
 		Assert.assertTrue(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
 
@@ -143,7 +143,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
 		Assert.assertTrue(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 	}
@@ -168,7 +168,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator);
 		EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1;
 		Assert.assertFalse(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
@@ -194,7 +194,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator);
 		EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2;
 		Assert.assertFalse(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
index 3139941..39033cc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
@@ -54,6 +54,7 @@ public class EvictingNonKeyedWindowOperatorTest {
 
 		EvictingNonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingNonKeyedWindowOperator<>(
 				GlobalWindows.create(),
+				new GlobalWindow.Serializer(),
 				new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(),
 				new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
 				CountTrigger.of(WINDOW_SLIDE),

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 3d9605e..afc65d5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import jdk.nashorn.internal.objects.Global;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
@@ -54,7 +56,9 @@ public class EvictingWindowOperatorTest {
 
 		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
 				GlobalWindows.create(),
+				new GlobalWindow.Serializer(),
 				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(),
 				new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
 				CountTrigger.of(WINDOW_SLIDE),

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
index 6cc8931..a91d957 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
@@ -76,6 +76,7 @@ public class NonKeyedWindowOperatorTest {
 
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
 				SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
 				windowBufferFactory,
 				new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
 				WatermarkTrigger.create());
@@ -156,6 +157,7 @@ public class NonKeyedWindowOperatorTest {
 
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
 				TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
 				windowBufferFactory,
 				new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
 				WatermarkTrigger.create());
@@ -234,6 +236,7 @@ public class NonKeyedWindowOperatorTest {
 
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
 				GlobalWindows.create(),
+				new GlobalWindow.Serializer(),
 				windowBufferFactory,
 				new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
 				ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
@@ -312,6 +315,7 @@ public class NonKeyedWindowOperatorTest {
 
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
 				GlobalWindows.create(),
+				new GlobalWindow.Serializer(),
 				windowBufferFactory,
 				new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
 				PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index d387df0..e825b88 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
@@ -76,7 +77,9 @@ public class WindowOperatorTest {
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
 				SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				windowBufferFactory,
 				new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
 				WatermarkTrigger.create());
@@ -163,7 +166,9 @@ public class WindowOperatorTest {
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
 				TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				windowBufferFactory,
 				new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
 				WatermarkTrigger.create());
@@ -246,7 +251,9 @@ public class WindowOperatorTest {
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
 				GlobalWindows.create(),
+				new GlobalWindow.Serializer(),
 				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				windowBufferFactory,
 				new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
 				ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
@@ -331,7 +338,9 @@ public class WindowOperatorTest {
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
 				GlobalWindows.create(),
+				new GlobalWindow.Serializer(),
 				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				windowBufferFactory,
 				new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
 				PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 10fe734..02ec820 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -116,7 +116,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator1 instanceof WindowOperator);
 		WindowOperator winOperator1 = (WindowOperator) operator1;
 		Assert.assertFalse(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
 
@@ -140,7 +140,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator2 instanceof WindowOperator);
 		WindowOperator winOperator2 = (WindowOperator) operator2;
 		Assert.assertFalse(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator2.getTrigger() instanceof WatermarkTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 	}
@@ -166,7 +166,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator1 instanceof WindowOperator);
 		WindowOperator winOperator1 = (WindowOperator) operator1;
 		Assert.assertTrue(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
 
@@ -191,7 +191,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator2 instanceof WindowOperator);
 		WindowOperator winOperator2 = (WindowOperator) operator2;
 		Assert.assertTrue(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 	}
@@ -217,7 +217,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
 		EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
 		Assert.assertFalse(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
@@ -244,7 +244,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
 		EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
 		Assert.assertFalse(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 950b0f5..60b7894 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -97,8 +98,7 @@ public class SessionWindowing {
 
 		private static final long serialVersionUID = 1L;
 
-		private volatile Long lastSeenEvent = 1L;
-		private Long sessionTimeout;
+		private final Long sessionTimeout;
 
 		public SessionTrigger(Long sessionTimeout) {
 			this.sessionTimeout = sessionTimeout;
@@ -106,13 +106,17 @@ public class SessionWindowing {
 		}
 
 		@Override
-		public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) {
-			Long timeSinceLastEvent = timestamp - lastSeenEvent;
+		public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
+
+			OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L);
+			Long lastSeen = lastSeenState.value();
+
+			Long timeSinceLastEvent = timestamp - lastSeen;
 
 			// Update the last seen event time
-			lastSeenEvent = timestamp;
+			lastSeenState.update(timestamp);
 
-			ctx.registerWatermarkTimer(lastSeenEvent + sessionTimeout);
+			ctx.registerWatermarkTimer(lastSeen + sessionTimeout);
 
 			if (timeSinceLastEvent > sessionTimeout) {
 				return TriggerResult.FIRE_AND_PURGE;
@@ -122,17 +126,15 @@ public class SessionWindowing {
 		}
 
 		@Override
-		public TriggerResult onTime(long time, TriggerContext ctx) {
-			if (time - lastSeenEvent >= sessionTimeout) {
+		public TriggerResult onTime(long time, TriggerContext ctx) throws Exception {
+			OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L);
+			Long lastSeen = lastSeenState.value();
+
+			if (time - lastSeen >= sessionTimeout) {
 				return TriggerResult.FIRE_AND_PURGE;
 			}
 			return TriggerResult.CONTINUE;
 		}
-
-		@Override
-		public SessionTrigger duplicate() {
-			return new SessionTrigger(sessionTimeout);
-		}
 	}
 
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 33104ab..0357144 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -202,6 +202,58 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
   }
 
+  /**
+   * Applies the given window function to each window. The window function is called for each
+   * evaluation of the window for each key individually. The output of the window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+   *
+   * @param preAggregator The reduce function that is used for pre-aggregation
+   * @param function The window function.
+   * @return The data stream that is the result of applying the window function to the window.
+   */
+  def apply[R: TypeInformation: ClassTag](
+      preAggregator: ReduceFunction[T],
+      function: AllWindowFunction[T, R, W]): DataStream[R] = {
+    javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]])
+  }
+
+  /**
+   * Applies the given window function to each window. The window function is called for each
+   * evaluation of the window for each key individually. The output of the window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+   *
+   * @param preAggregator The reduce function that is used for pre-aggregation
+   * @param function The window function.
+   * @return The data stream that is the result of applying the window function to the window.
+   */
+  def apply[R: TypeInformation: ClassTag](
+      preAggregator: (T, T) => T,
+      function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+    if (function == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    if (function == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
+    val cleanReducer = clean(preAggregator)
+    val reducer = new ReduceFunction[T] {
+      def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) }
+    }
+
+    val cleanApply = clean(function)
+    val applyFunction = new AllWindowFunction[T, R, W] {
+      def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
+        cleanApply(window, elements.asScala, out)
+      }
+    }
+    javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
+  }
+
   // ------------------------------------------------------------------------
   //  Aggregations on the keyed windows
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index d4f4618..93b91ff 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -196,6 +196,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
    */
   def apply[R: TypeInformation: ClassTag](
       function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+    if (function == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
     val cleanedFunction = clean(function)
     val applyFunction = new WindowFunction[T, R, K, W] {
       def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
@@ -205,6 +209,58 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
   }
 
+  /**
+   * Applies the given window function to each window. The window function is called for each
+   * evaluation of the window for each key individually. The output of the window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+   *
+   * @param preAggregator The reduce function that is used for pre-aggregation
+   * @param function The window function.
+   * @return The data stream that is the result of applying the window function to the window.
+   */
+  def apply[R: TypeInformation: ClassTag](
+      preAggregator: ReduceFunction[T],
+      function: WindowFunction[T, R, K, W]): DataStream[R] = {
+    javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]])
+  }
+
+  /**
+   * Applies the given window function to each window. The window function is called for each
+   * evaluation of the window for each key individually. The output of the window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+   *
+   * @param preAggregator The reduce function that is used for pre-aggregation
+   * @param function The window function.
+   * @return The data stream that is the result of applying the window function to the window.
+   */
+  def apply[R: TypeInformation: ClassTag](
+      preAggregator: (T, T) => T,
+      function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+    if (function == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    if (function == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
+    val cleanReducer = clean(preAggregator)
+    val reducer = new ReduceFunction[T] {
+      def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) }
+    }
+
+    val cleanApply = clean(function)
+    val applyFunction = new WindowFunction[T, R, K, W] {
+      def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
+        cleanApply(key, window, elements.asScala, out)
+      }
+    }
+    javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
+  }
+
   // ------------------------------------------------------------------------
   //  Aggregations on the keyed windows
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 99fcd07..7da7bc3 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -22,8 +22,9 @@ package org.apache.flink.streaming.api.scala
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.api.common.functions.RichReduceFunction
+import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
+import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, AllWindowFunction}
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, SlidingTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
@@ -111,7 +112,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
     val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
-    assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(
       winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
@@ -134,7 +135,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
     val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
-    assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
@@ -161,7 +162,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     assertTrue(operator1.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
     val winOperator1 = operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
-    assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
     assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
@@ -185,11 +186,72 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     assertTrue(operator2.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
     val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
-    assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
     assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
+
+  @Test
+  def testPreReduce(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val reducer = new DummyReducer
+
+    val window1 = source
+      .keyBy(0)
+      .window(SlidingTimeWindows.of(
+        Time.of(1, TimeUnit.SECONDS),
+        Time.of(100, TimeUnit.MILLISECONDS)))
+      .trigger(CountTrigger.of(100))
+      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+        def apply(
+                   tuple: Tuple,
+                   window: TimeWindow,
+                   values: java.lang.Iterable[(String, Int)],
+                   out: Collector[(String, Int)]) { }
+      })
+
+    val transform1 = window1.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
+    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
+    assertTrue(
+      winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+
+
+    val window2 = source
+      .keyBy(0)
+      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      .trigger(CountTrigger.of(100))
+      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+        def apply(
+                   tuple: Tuple,
+                   window: TimeWindow,
+                   values: java.lang.Iterable[(String, Int)],
+                   out: Collector[(String, Int)]) { }
+      })
+
+    val transform2 = window2.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator2 = transform2.getOperator
+
+    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
+    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
+    assertTrue(
+      winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+  }
+
 }
 
 // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 65f978c..46981ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -108,7 +108,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
     val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
-    assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(
       winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
@@ -133,7 +133,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
     val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
-    assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
@@ -161,7 +161,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     assertTrue(operator1.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
     val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
-    assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
     assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
@@ -187,9 +187,69 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     assertTrue(operator2.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
     val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
-    assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
     assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
+
+  @Test
+  def testPreReduce(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val reducer = new DummyReducer
+
+    val window1 = source
+      .keyBy(0)
+      .window(SlidingTimeWindows.of(
+        Time.of(1, TimeUnit.SECONDS),
+        Time.of(100, TimeUnit.MILLISECONDS)))
+      .trigger(CountTrigger.of(100))
+      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+        def apply(
+                   tuple: Tuple,
+                   window: TimeWindow,
+                   values: java.lang.Iterable[(String, Int)],
+                   out: Collector[(String, Int)]) { }
+      })
+
+    val transform1 = window1.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
+    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
+    assertTrue(
+      winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+
+
+    val window2 = source
+      .keyBy(0)
+      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      .trigger(CountTrigger.of(100))
+      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+        def apply(
+                   tuple: Tuple,
+                   window: TimeWindow,
+                   values: java.lang.Iterable[(String, Int)],
+                   out: Collector[(String, Int)]) { }
+      })
+
+    val transform2 = window2.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator2 = transform2.getOperator
+
+    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
+    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
+    assertTrue(
+      winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+  }
 }


Mime
View raw message