flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [05/11] flink git commit: [FLINK-4460] Provide late-data output for window operations
Date Sat, 18 Mar 2017 07:13:44 GMT
[FLINK-4460] Provide late-data output for window operations

We use side outputs to emit dropped late data.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07a15d0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07a15d0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07a15d0e

Branch: refs/heads/master
Commit: 07a15d0e1647c79ae010ca6df5b1830a4087dd56
Parents: e134d27
Author: Chen Qin <qinnchen@gmail.com>
Authored: Wed Mar 1 15:36:17 2017 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Sat Mar 18 07:44:17 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |  79 ++++++--
 .../api/datastream/WindowedStream.java          |  75 +++++--
 .../windowing/EvictingWindowOperator.java       |  10 +-
 .../operators/windowing/WindowOperator.java     |  54 ++++-
 .../windowing/EvictingWindowOperatorTest.java   |  27 ++-
 .../windowing/WindowOperatorContractTest.java   |   3 +-
 .../windowing/WindowOperatorMigrationTest.java  |  20 +-
 .../operators/windowing/WindowOperatorTest.java | 202 +++++++++++++------
 .../streaming/api/scala/AllWindowedStream.scala |  16 +-
 .../streaming/api/scala/WindowedStream.scala    |  18 +-
 .../streaming/runtime/SideOutputITCase.java     | 173 ++++++++++++++++
 11 files changed, 553 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index a45cb0a..50f0f85 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -63,6 +63,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -106,6 +108,12 @@ public class AllWindowedStream<T, W extends Window> {
 	/** The user-specified allowed lateness. */
 	private long allowedLateness = 0L;
 
+	/**
+	 * Side output {@code OutputTag} for late data. If no tag is set late data will simply be
+	 * dropped.
+	 */
+	private OutputTag<T> lateDataOutputTag;
+
 	@PublicEvolving
 	public AllWindowedStream(DataStream<T> input,
 			WindowAssigner<? super T, W> windowAssigner) {
@@ -144,6 +152,23 @@ public class AllWindowedStream<T, W extends Window> {
 	}
 
 	/**
+	 * Send late arriving data to the side output identified by the given {@link OutputTag}. Data
+	 * is considered late after the watermark has passed the end of the window plus the allowed
+	 * lateness set using {@link #allowedLateness(Time)}.
+	 *
+	 * <p>You can get the stream of late data using
+	 * {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the
+	 * {@link SingleOutputStreamOperator} resulting from the windowed operation
+	 * with the same {@link OutputTag}.
+	 */
+	@PublicEvolving
+	public AllWindowedStream<T, W> sideOutputLateData(OutputTag<T> outputTag) {
+		Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
+		this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
+		return this;
+	}
+
+	/**
 	 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
 	 *
 	 * <p>
@@ -271,7 +296,8 @@ public class AllWindowedStream<T, W extends Window> {
 					new InternalIterableAllWindowFunction<>(new ReduceApplyAllWindowFunction<>(reduceFunction, function)),
 					trigger,
 					evictor,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 
 		} else {
 			ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -288,7 +314,8 @@ public class AllWindowedStream<T, W extends Window> {
 					stateDesc,
 					new InternalSingleValueAllWindowFunction<>(function),
 					trigger,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator).forceNonParallel();
@@ -367,7 +394,8 @@ public class AllWindowedStream<T, W extends Window> {
 					new InternalIterableProcessAllWindowFunction<>(new ReduceApplyProcessAllWindowFunction<>(reduceFunction, function)),
 					trigger,
 					evictor,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 
 		} else {
 			ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -384,7 +412,8 @@ public class AllWindowedStream<T, W extends Window> {
 					stateDesc,
 					new InternalSingleValueProcessAllWindowFunction<>(function),
 					trigger,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator).forceNonParallel();
@@ -562,7 +591,8 @@ public class AllWindowedStream<T, W extends Window> {
 									new AggregateApplyAllWindowFunction<>(aggregateFunction, windowFunction)),
 							trigger,
 							evictor,
-							allowedLateness);
+							allowedLateness,
+							lateDataOutputTag);
 
 		} else {
 			AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>(
@@ -580,7 +610,8 @@ public class AllWindowedStream<T, W extends Window> {
 							stateDesc,
 							new InternalSingleValueAllWindowFunction<>(windowFunction),
 							trigger,
-							allowedLateness);
+							allowedLateness,
+							lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator).forceNonParallel();
@@ -693,7 +724,8 @@ public class AllWindowedStream<T, W extends Window> {
 					new InternalAggregateProcessAllWindowFunction<>(aggregateFunction, windowFunction),
 					trigger,
 					evictor,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 
 		} else {
 			AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>(
@@ -711,7 +743,8 @@ public class AllWindowedStream<T, W extends Window> {
 					stateDesc,
 					new InternalSingleValueProcessAllWindowFunction<>(windowFunction),
 					trigger,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator).forceNonParallel();
@@ -842,7 +875,8 @@ public class AllWindowedStream<T, W extends Window> {
 					new InternalIterableAllWindowFunction<>(new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function, foldAccumulatorType)),
 					trigger,
 					evictor,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 
 		} else {
 			FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
@@ -858,7 +892,8 @@ public class AllWindowedStream<T, W extends Window> {
 					stateDesc,
 					new InternalSingleValueAllWindowFunction<>(function),
 					trigger,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator).forceNonParallel();
@@ -948,7 +983,8 @@ public class AllWindowedStream<T, W extends Window> {
 					new InternalIterableProcessAllWindowFunction<>(new FoldApplyProcessAllWindowFunction<>(initialValue, foldFunction, function, foldAccumulatorType)),
 					trigger,
 					evictor,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 
 		} else {
 			FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
@@ -964,7 +1000,8 @@ public class AllWindowedStream<T, W extends Window> {
 					stateDesc,
 					new InternalSingleValueProcessAllWindowFunction<>(function),
 					trigger,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator).forceNonParallel();
@@ -1080,7 +1117,8 @@ public class AllWindowedStream<T, W extends Window> {
 					function,
 					trigger,
 					evictor,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 
 		} else {
 			ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
@@ -1096,7 +1134,8 @@ public class AllWindowedStream<T, W extends Window> {
 					stateDesc,
 					function,
 					trigger,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator).forceNonParallel();
@@ -1177,7 +1216,8 @@ public class AllWindowedStream<T, W extends Window> {
 					new InternalIterableAllWindowFunction<>(new ReduceApplyAllWindowFunction<>(reduceFunction, function)),
 					trigger,
 					evictor,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 
 		} else {
 			ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -1194,7 +1234,8 @@ public class AllWindowedStream<T, W extends Window> {
 					stateDesc,
 					new InternalSingleValueAllWindowFunction<>(function),
 					trigger,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator).forceNonParallel();
@@ -1280,7 +1321,8 @@ public class AllWindowedStream<T, W extends Window> {
 					new InternalIterableAllWindowFunction<>(new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function, resultType)),
 					trigger,
 					evictor,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 
 		} else {
 			FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
@@ -1296,7 +1338,8 @@ public class AllWindowedStream<T, W extends Window> {
 					stateDesc,
 					new InternalSingleValueAllWindowFunction<>(function),
 					trigger,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator).forceNonParallel();

http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 164e47e..334851e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -73,6 +73,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -120,6 +122,12 @@ public class WindowedStream<T, K, W extends Window> {
 	/** The user-specified allowed lateness. */
 	private long allowedLateness = 0L;
 
+	/**
+	 * Side output {@code OutputTag} for late data. If no tag is set late data will simply be
+	 * dropped.
+ 	 */
+	private OutputTag<T> lateDataOutputTag;
+
 	@PublicEvolving
 	public WindowedStream(KeyedStream<T, K> input,
 			WindowAssigner<? super T, W> windowAssigner) {
@@ -162,6 +170,23 @@ public class WindowedStream<T, K, W extends Window> {
 	}
 
 	/**
+	 * Send late arriving data to the side output identified by the given {@link OutputTag}. Data
+	 * is considered late after the watermark has passed the end of the window plus the allowed
+	 * lateness set using {@link #allowedLateness(Time)}.
+	 *
+	 * <p>You can get the stream of late data using
+	 * {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the
+	 * {@link SingleOutputStreamOperator} resulting from the windowed operation
+	 * with the same {@link OutputTag}.
+	 */
+	@PublicEvolving
+	public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
+		Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
+		this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
+		return this;
+	}
+
+	/**
 	 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
 	 *
 	 * <p>
@@ -344,7 +369,8 @@ public class WindowedStream<T, K, W extends Window> {
 					new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
 					trigger,
 					evictor,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 
 		} else {
 			ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -362,6 +388,7 @@ public class WindowedStream<T, K, W extends Window> {
 					new InternalSingleValueWindowFunction<>(function),
 					trigger,
 					allowedLateness,
+					lateDataOutputTag,
 					legacyWindowOpType);
 		}
 
@@ -437,7 +464,8 @@ public class WindowedStream<T, K, W extends Window> {
 							new InternalIterableProcessWindowFunction<>(new ReduceApplyProcessWindowFunction<>(reduceFunction, function)),
 							trigger,
 							evictor,
-							allowedLateness);
+							allowedLateness,
+							lateDataOutputTag);
 
 		} else {
 			ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -454,7 +482,8 @@ public class WindowedStream<T, K, W extends Window> {
 							stateDesc,
 							new InternalSingleValueProcessWindowFunction<>(function),
 							trigger,
-							allowedLateness);
+							allowedLateness,
+							lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator);
@@ -589,7 +618,8 @@ public class WindowedStream<T, K, W extends Window> {
 				new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function, foldAccumulatorType)),
 				trigger,
 				evictor,
-				allowedLateness);
+				allowedLateness,
+				lateDataOutputTag);
 
 		} else {
 			FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
@@ -604,7 +634,8 @@ public class WindowedStream<T, K, W extends Window> {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(function),
 				trigger,
-				allowedLateness);
+				allowedLateness,
+				lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator);
@@ -697,7 +728,8 @@ public class WindowedStream<T, K, W extends Window> {
 							new InternalIterableProcessWindowFunction<>(new FoldApplyProcessWindowFunction<>(initialValue, foldFunction, windowFunction, foldResultType)),
 							trigger,
 							evictor,
-							allowedLateness);
+							allowedLateness,
+							lateDataOutputTag);
 
 		} else {
 			FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
@@ -715,7 +747,8 @@ public class WindowedStream<T, K, W extends Window> {
 							stateDesc,
 							new InternalSingleValueProcessWindowFunction<>(windowFunction),
 							trigger,
-							allowedLateness);
+							allowedLateness,
+							lateDataOutputTag);
 		}
 
 		return input.transform(opName, windowResultType, operator);
@@ -890,7 +923,8 @@ public class WindowedStream<T, K, W extends Window> {
 					new InternalIterableWindowFunction<>(new AggregateApplyWindowFunction<>(aggregateFunction, windowFunction)),
 					trigger,
 					evictor,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 
 		} else {
 			AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>("window-contents",
@@ -905,7 +939,8 @@ public class WindowedStream<T, K, W extends Window> {
 					stateDesc,
 					new InternalSingleValueWindowFunction<>(windowFunction),
 					trigger,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator);
@@ -1017,7 +1052,8 @@ public class WindowedStream<T, K, W extends Window> {
 					new InternalAggregateProcessWindowFunction<>(aggregateFunction, windowFunction),
 					trigger,
 					evictor,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 
 		} else {
 			AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>("window-contents",
@@ -1032,7 +1068,8 @@ public class WindowedStream<T, K, W extends Window> {
 					stateDesc,
 					new InternalSingleValueProcessWindowFunction<>(windowFunction),
 					trigger,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator);
@@ -1154,7 +1191,8 @@ public class WindowedStream<T, K, W extends Window> {
 					function,
 					trigger,
 					evictor,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 
 		} else {
 			ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
@@ -1171,6 +1209,7 @@ public class WindowedStream<T, K, W extends Window> {
 					function,
 					trigger,
 					allowedLateness,
+					lateDataOutputTag,
 					legacyWindowOpType);
 		}
 
@@ -1252,7 +1291,8 @@ public class WindowedStream<T, K, W extends Window> {
 					new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
 					trigger,
 					evictor,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 
 		} else {
 			ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -1269,7 +1309,8 @@ public class WindowedStream<T, K, W extends Window> {
 					stateDesc,
 					new InternalSingleValueWindowFunction<>(function),
 					trigger,
-					allowedLateness);
+					allowedLateness,
+					lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator);
@@ -1354,7 +1395,8 @@ public class WindowedStream<T, K, W extends Window> {
 				new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function, resultType)),
 				trigger,
 				evictor,
-				allowedLateness);
+				allowedLateness,
+				lateDataOutputTag);
 
 		} else {
 			FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
@@ -1369,7 +1411,8 @@ public class WindowedStream<T, K, W extends Window> {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(function),
 				trigger,
-				allowedLateness);
+				allowedLateness,
+				lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator);

http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 45fea14..8dfc717 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
 
 import java.util.Collection;
 
@@ -86,10 +87,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 			InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger,
 			Evictor<? super IN, ? super W> evictor,
-			long allowedLateness) {
+			long allowedLateness,
+			OutputTag<IN> lateDataOutputTag) {
 
 		super(windowAssigner, windowSerializer, keySelector,
-			keySerializer, null, windowFunction, trigger, allowedLateness);
+			keySerializer, null, windowFunction, trigger, allowedLateness, lateDataOutputTag);
 
 		this.evictor = checkNotNull(evictor);
 		this.evictingWindowStateDescriptor = checkNotNull(windowStateDescriptor);
@@ -137,7 +139,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 						});
 
 				// check if the window is already inactive
-				if (isLate(actualWindow)) {
+				if (isWindowLate(actualWindow)) {
 					mergingWindows.retireWindow(actualWindow);
 					continue;
 				}
@@ -177,7 +179,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 			for (W window : elementWindows) {
 
 				// check if the window is already inactive
-				if (isLate(window)) {
+				if (isWindowLate(window)) {
 					continue;
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 3c4f397..9ce1ae7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -132,6 +133,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 */
 	private final long allowedLateness;
 
+	/**
+	 * {@link OutputTag} to use for late arriving events. Elements for which
+	 * {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will
+	 * be emitted to this.
+	 */
+	private final OutputTag<IN> lateDataOutputTag;
+
 	// ------------------------------------------------------------------------
 	// State that is not checkpointed
 	// ------------------------------------------------------------------------
@@ -200,10 +208,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
 			InternalWindowFunction<ACC, OUT, K, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger,
-			long allowedLateness) {
+			long allowedLateness,
+			OutputTag<IN> lateDataOutputTag) {
 
 		this(windowAssigner, windowSerializer, keySelector, keySerializer,
-			windowStateDescriptor, windowFunction, trigger, allowedLateness, LegacyWindowOperatorType.NONE);
+			windowStateDescriptor, windowFunction, trigger, allowedLateness, lateDataOutputTag, LegacyWindowOperatorType.NONE);
 	}
 
 	/**
@@ -218,6 +227,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			InternalWindowFunction<ACC, OUT, K, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger,
 			long allowedLateness,
+			OutputTag<IN> lateDataOutputTag,
 			LegacyWindowOperatorType legacyWindowOperatorType) {
 
 		super(windowFunction);
@@ -239,6 +249,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		this.windowStateDescriptor = windowStateDescriptor;
 		this.trigger = checkNotNull(trigger);
 		this.allowedLateness = allowedLateness;
+		this.lateDataOutputTag = lateDataOutputTag;
 		this.legacyWindowOperatorType = legacyWindowOperatorType;
 
 		setChainingStrategy(ChainingStrategy.ALWAYS);
@@ -323,6 +334,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		final Collection<W> elementWindows = windowAssigner.assignWindows(
 			element.getValue(), element.getTimestamp(), windowAssignerContext);
 
+		//if element is handled by none of assigned elementWindows
+		boolean isSkippedElement = true;
+
 		final K key = this.<K>getKeyedStateBackend().getCurrentKey();
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
@@ -355,10 +369,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				});
 
 				// drop if the window is already late
-				if (isLate(actualWindow)) {
+				if (isWindowLate(actualWindow)) {
 					mergingWindows.retireWindow(actualWindow);
 					continue;
 				}
+				isSkippedElement = false;
 
 				W stateWindow = mergingWindows.getStateWindow(actualWindow);
 				if (stateWindow == null) {
@@ -393,9 +408,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			for (W window: elementWindows) {
 
 				// drop if the window is already late
-				if (isLate(window)) {
+				if (isWindowLate(window)) {
 					continue;
 				}
+				isSkippedElement = false;
 
 				windowState.setCurrentNamespace(window);
 				windowState.add(element.getValue());
@@ -419,6 +435,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				registerCleanupTimer(window);
 			}
 		}
+
+		// side output input event if
+		// element not handled by any window
+		// late arriving tag has been set
+		// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
+		if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) {
+			sideOutput(element);
+		}
 	}
 
 	@Override
@@ -546,6 +570,15 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	}
 
 	/**
+	 * Write skipped late arriving element to SideOutput
+	 *
+	 * @param element skipped late arriving element to side output
+	 */
+	private void sideOutput(StreamRecord<IN> element){
+		output.collect(lateDataOutputTag, element);
+	}
+
+	/**
 	 * Retrieves the {@link MergingWindowSet} for the currently active key.
 	 * The caller must ensure that the correct key is set in the state backend.
 	 *
@@ -562,11 +595,22 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 * Returns {@code true} if the watermark is after the end timestamp plus the allowed lateness
 	 * of the given window.
 	 */
-	protected boolean isLate(W window) {
+	protected boolean isWindowLate(W window) {
 		return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
 	}
 
 	/**
+	 * Decide if a record is currently late, based on current watermark and allowed lateness.
+	 *
+	 * @param element The element to check
+	 * @return The element for which should be considered when sideoutputs
+	 */
+	protected boolean isElementLate(StreamRecord<IN> element){
+		return (windowAssigner.isEventTime()) &&
+			(element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark());
+	}
+
+	/**
 	 * Registers a timer to cleanup the content of the window.
 	 * @param window
 	 * 					the window whose state to discard

http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 8da1d7c..e9d63de 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -89,7 +89,8 @@ public class EvictingWindowOperatorTest {
 			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
 			CountTrigger.of(TRIGGER_COUNT),
 			CountEvictor.of(WINDOW_SIZE,EVICT_AFTER),
-			0);
+			0,
+			null /* late data output tag */);
 
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
@@ -169,7 +170,8 @@ public class EvictingWindowOperatorTest {
 			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
 			CountTrigger.of(TRIGGER_COUNT),
 			TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
-			0);
+			0,
+			null /* late data output tag */);
 
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
@@ -243,7 +245,8 @@ public class EvictingWindowOperatorTest {
 			new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)),
 			CountTrigger.of(TRIGGER_COUNT),
 			TimeEvictor.of(Time.seconds(2)),
-			0);
+			0,
+			null /* late data output tag */);
 
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
@@ -319,7 +322,8 @@ public class EvictingWindowOperatorTest {
 			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
 			CountTrigger.of(TRIGGER_COUNT),
 			TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
-			0);
+			0,
+			null /* late data output tag */);
 
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
@@ -398,7 +402,8 @@ public class EvictingWindowOperatorTest {
 					return newDataPoint.f1 - oldDataPoint.f1;
 				}
 			}, EVICT_AFTER),
-			0);
+			0,
+			null /* late data output tag */);
 
 
 
@@ -475,7 +480,8 @@ public class EvictingWindowOperatorTest {
 					return newDataPoint.f1 - oldDataPoint.f1;
 				}
 			}, EVICT_AFTER),
-			0);
+			0,
+			null /* late data output tag */);
 
 
 
@@ -543,7 +549,8 @@ public class EvictingWindowOperatorTest {
 				new InternalIterableWindowFunction<>(new ReduceIterableWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer())),
 				CountTrigger.of(WINDOW_SLIDE),
 				CountEvictor.of(WINDOW_SIZE),
-				0);
+				0,
+				null /* late data output tag */);
 
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
@@ -615,7 +622,8 @@ public class EvictingWindowOperatorTest {
 			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
 			CountTrigger.of(WINDOW_SLIDE),
 			CountEvictor.of(WINDOW_SIZE),
-			0);
+			0,
+			null /* late data output tag */);
 
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
@@ -686,7 +694,8 @@ public class EvictingWindowOperatorTest {
 			new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)),
 			EventTimeTrigger.create(),
 			CountEvictor.of(WINDOW_SIZE),
-			0);
+			0,
+			null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);

http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 7c4d711..f70990f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -2322,7 +2322,8 @@ public class WindowOperatorContractTest extends TestLogger {
 				stateDescriptor,
 				windowFunction,
 				trigger,
-				allowedLatenss);
+				allowedLatenss,
+				null /*late data output tag */);
 
 		return new KeyedOneInputStreamOperatorTestHarness<>(
 				operator,

http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
index 7a356cf..19fa04f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
@@ -100,7 +100,8 @@ public class WindowOperatorMigrationTest {
 				stateDesc,
 				new InternalIterableWindowFunction<>(new SessionWindowFunction()),
 				PurgingTrigger.of(CountTrigger.of(4)),
-				0);
+				0,
+				null /* late data output tag */);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -181,7 +182,8 @@ public class WindowOperatorMigrationTest {
 				stateDesc,
 				new InternalIterableWindowFunction<>(new SessionWindowFunction()),
 				PurgingTrigger.of(CountTrigger.of(4)),
-				0);
+				0,
+				null /* late data output tag */);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -258,7 +260,8 @@ public class WindowOperatorMigrationTest {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				EventTimeTrigger.create(),
-				0);
+				0,
+				null /* late data output tag */);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -346,7 +349,8 @@ public class WindowOperatorMigrationTest {
 				stateDesc,
 				new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
 				EventTimeTrigger.create(),
-				0);
+				0,
+				null /* late data output tag */);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -435,7 +439,8 @@ public class WindowOperatorMigrationTest {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				ProcessingTimeTrigger.create(),
-				0);
+				0,
+				null /* late data output tag */);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -510,7 +515,8 @@ public class WindowOperatorMigrationTest {
 				stateDesc,
 				new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
 				ProcessingTimeTrigger.create(),
-				0);
+				0,
+				null /* late data output tag */);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -633,6 +639,7 @@ public class WindowOperatorMigrationTest {
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				ProcessingTimeTrigger.create(),
 				0,
+				null /* late data output tag */,
 				LegacyWindowOperatorType.FAST_AGGREGATING);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
@@ -735,6 +742,7 @@ public class WindowOperatorMigrationTest {
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				ProcessingTimeTrigger.create(),
 				0,
+				null /* late data output tag */,
 				LegacyWindowOperatorType.FAST_ACCUMULATING);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =

http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index a9c3ef6..b38cb2e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -92,6 +93,9 @@ public class WindowOperatorTest extends TestLogger {
 	// For counting if close() is called the correct number of times on the SumReducer
 	private static AtomicInteger closeCalled = new AtomicInteger(0);
 
+	// late arriving event OutputTag<StreamRecord<IN>>
+	private static final OutputTag<Tuple2<String, Integer>> lateOutputTag = new OutputTag<Tuple2<String, Integer>>("late-output") {};
+
 	private void testSlidingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness) throws Exception {
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -181,7 +185,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				EventTimeTrigger.create(),
-				0);
+				0,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -215,7 +220,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
 				EventTimeTrigger.create(),
-				0);
+				0,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -314,7 +320,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				EventTimeTrigger.create(),
-				0);
+				0,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -346,7 +353,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
 				EventTimeTrigger.create(),
-				0);
+				0,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -381,7 +389,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalIterableWindowFunction<>(new SessionWindowFunction()),
 				EventTimeTrigger.create(),
-				0);
+				0,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -453,7 +462,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalIterableProcessWindowFunction<>(new SessionProcessWindowFunction()),
 				EventTimeTrigger.create(),
-				0);
+				0,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -525,7 +535,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
 				EventTimeTrigger.create(),
-				0);
+				0,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -595,7 +606,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueProcessWindowFunction<>(new ReducedProcessSessionWindowFunction()),
 				EventTimeTrigger.create(),
-				0);
+				0,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -669,7 +681,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalIterableWindowFunction<>(new SessionWindowFunction()),
 				PurgingTrigger.of(CountTrigger.of(4)),
-				0);
+				0,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -739,7 +752,8 @@ public class WindowOperatorTest extends TestLogger {
 			stateDesc,
 			new InternalIterableWindowFunction<>(new SessionWindowFunction()),
 			ContinuousEventTimeTrigger.of(Time.seconds(2)),
-			0);
+			0,
+			null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -843,7 +857,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalIterableWindowFunction<>(new SessionWindowFunction()),
 				EventTimeTrigger.create(),
-				0);
+				0,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -901,7 +916,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
 				ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
-				0);
+				0,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -989,7 +1005,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
 				PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
-				0);
+				0,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1030,7 +1047,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
 				PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
-				0);
+				0,
+				null /* late data output tag */);
 
 		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -1075,7 +1093,9 @@ public class WindowOperatorTest extends TestLogger {
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
-				ProcessingTimeTrigger.create(), 0);
+				ProcessingTimeTrigger.create(),
+				0,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1132,7 +1152,9 @@ public class WindowOperatorTest extends TestLogger {
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
-				ProcessingTimeTrigger.create(), 0);
+				ProcessingTimeTrigger.create(),
+				0,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1202,7 +1224,9 @@ public class WindowOperatorTest extends TestLogger {
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
-				ProcessingTimeTrigger.create(), 0);
+				ProcessingTimeTrigger.create(),
+				0,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1267,7 +1291,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				PurgingTrigger.of(EventTimeTrigger.create()),
-				LATENESS);
+				LATENESS,
+				lateOutputTag);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1275,6 +1300,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.open();
 		
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+		ConcurrentLinkedQueue<Object> lateExpected = new ConcurrentLinkedQueue<>();
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 500));
 		testHarness.processWatermark(new Watermark(1500));
@@ -1287,7 +1313,7 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), 1999));
 		expected.add(new Watermark(2300));
 
-		// this will not be dropped because window.maxTimestamp() + allowedLateness > currentWatermark
+		// this will not be sideoutput because window.maxTimestamp() + allowedLateness > currentWatermark
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1997));
 		testHarness.processWatermark(new Watermark(6000));
 
@@ -1295,13 +1321,21 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
 		expected.add(new Watermark(6000));
 
-		// this will be dropped because window.maxTimestamp() + allowedLateness < currentWatermark
+		// this will be side output because window.maxTimestamp() + allowedLateness < currentWatermark
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
 		testHarness.processWatermark(new Watermark(7000));
 
+		lateExpected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
 		expected.add(new Watermark(7000));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
+		
+		TestHarnessUtil.assertOutputEqualsSorted(
+				"SideOutput was not correct.",
+				lateExpected,
+				testHarness.getSideOutput(lateOutputTag),
+				new Tuple2ResultSortComparator());
+
 		testHarness.close();
 	}
 
@@ -1327,7 +1361,8 @@ public class WindowOperatorTest extends TestLogger {
 					stateDesc,
 					new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 					EventTimeTrigger.create(),
-					LATENESS);
+					LATENESS,
+					null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1374,7 +1409,7 @@ public class WindowOperatorTest extends TestLogger {
 	}
 
 	@Test
-	public void testDropDueToLatenessTumbling() throws Exception {
+	public void testSideOutputDueToLatenessTumbling() throws Exception {
 		final int WINDOW_SIZE = 2;
 		final long LATENESS = 0;
 
@@ -1393,7 +1428,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				EventTimeTrigger.create(),
-				LATENESS);
+				LATENESS,
+				lateOutputTag);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1401,6 +1437,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.open();
 
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+		ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<>();
 
 		// normal element
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
@@ -1415,8 +1452,9 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), 1999));
 		expected.add(new Watermark(1999));
 
-		// dropped as late
+		// sideoutput as late, will reuse previous timestamp since only input tuple is sideoutputed
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
+		sideExpected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2001));
 		testHarness.processWatermark(new Watermark(2999));
@@ -1429,11 +1467,13 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new Watermark(3999));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, testHarness.getSideOutput(
+				lateOutputTag), new Tuple2ResultSortComparator());
 		testHarness.close();
 	}
 
 	@Test
-	public void testDropDueToLatenessSliding() throws Exception {
+	public void testSideOutputDueToLatenessSliding() throws Exception {
 		final int WINDOW_SIZE = 3;
 		final int WINDOW_SLIDE = 1;
 		final long LATENESS = 0;
@@ -1453,7 +1493,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				EventTimeTrigger.create(),
-				LATENESS);
+				LATENESS,
+				lateOutputTag /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1461,6 +1502,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.open();
 		
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+		ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<>();
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
 		testHarness.processWatermark(new Watermark(1999));
@@ -1497,19 +1539,21 @@ public class WindowOperatorTest extends TestLogger {
 
 		expected.add(new Watermark(6000));
 
-		// dropped due to lateness
+		// sideoutput element due to lateness
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 3001));
+		sideExpected.add(new StreamRecord<>(new Tuple2<>("key1", 1), 3001));
 
 		testHarness.processWatermark(new Watermark(25000));
 
 		expected.add(new Watermark(25000));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, testHarness.getSideOutput(lateOutputTag), new Tuple2ResultSortComparator());
 		testHarness.close();
 	}
 
 	@Test
-	public void testDropDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exception {
+	public void testSideOutputDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exception {
 		final int GAP_SIZE = 3;
 		final long LATENESS = 0;
 
@@ -1528,7 +1572,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
 				PurgingTrigger.of(EventTimeTrigger.create()),
-				LATENESS);
+				LATENESS,
+				lateOutputTag);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1536,6 +1581,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.open();
 		
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+		ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<>();
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
 		testHarness.processWatermark(new Watermark(1999));
@@ -1571,10 +1617,13 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
 		expected.add(new Watermark(14600));
 
-		// this is dropped as late
+		// this is side output as late
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
-		// this is also dropped as late (we test that they are not accidentally merged)
+		sideExpected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
+
+		// this is also side output as late (we test that they are not accidentally merged)
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10100));
+		sideExpected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 10100));
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
 		testHarness.processWatermark(new Watermark(20000));
@@ -1587,19 +1636,16 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new Watermark(100000));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+		ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, sideActual, new Tuple2ResultSortComparator());
+
 		testHarness.close();
 	}
 
 	@Test
-	public void testDropDueToLatenessSessionZeroLateness() throws Exception {
-		// same as testDropDueToLatenessSessionZeroLateness() but with an accumulating trigger, i.e.
-		// one that does not return FIRE_AND_PURGE when firing but just FIRE
-
-		// this has the same output as testDropDueToLatenessSessionZeroLateness() because
-		// accumulating/discarding does not make a difference with "allowed lateness" = 0.
-
+	public void testSideOutputDueToLatenessSessionZeroLateness() throws Exception {
 		final int GAP_SIZE = 3;
 		final long LATENESS = 0;
 
@@ -1618,7 +1664,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
 				EventTimeTrigger.create(),
-				LATENESS);
+				LATENESS,
+				lateOutputTag);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1626,6 +1673,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.open();
 		
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+		ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<>();
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
 		testHarness.processWatermark(new Watermark(1999));
@@ -1661,8 +1709,9 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
 		expected.add(new Watermark(14600));
 
-		// this is dropped as late
+		// this is sideoutput as late, reuse last timestamp
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
+		sideExpected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
 		testHarness.processWatermark(new Watermark(20000));
@@ -1674,14 +1723,16 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new Watermark(100000));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+		ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, sideActual, new Tuple2ResultSortComparator());
 		testHarness.close();
 	}
 
 	@Test
 	public void testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exception {
 
-		// this has the same output as testDropDueToLatenessSessionZeroLateness() because
+		// this has the same output as testSideOutputDueToLatenessSessionZeroLateness() because
 		// the allowed lateness is too small to make a difference
 
 		final int GAP_SIZE = 3;
@@ -1702,7 +1753,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
 				PurgingTrigger.of(EventTimeTrigger.create()),
-				LATENESS);
+				LATENESS,
+				lateOutputTag);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1710,7 +1762,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.open();
 
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
-		
+
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
 		testHarness.processWatermark(new Watermark(1999));
 
@@ -1759,13 +1811,14 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new Watermark(100000));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
 		testHarness.close();
 	}
 
 	@Test
-	public void testDropDueToLatenessSessionWithLateness() throws Exception {
-		// same as testDropDueToLatenessSessionWithLateness() but with an accumulating trigger, i.e.
+	public void testNotSideOutputDueToLatenessSessionWithLateness() throws Exception {
+		// same as testSideOutputDueToLatenessSessionWithLateness() but with an accumulating trigger, i.e.
 		// one that does not return FIRE_AND_PURGE when firing but just FIRE. The expected
 		// results are therefore slightly different.
 
@@ -1787,7 +1840,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
 				EventTimeTrigger.create(),
-				LATENESS);
+				LATENESS,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1806,7 +1860,7 @@ public class WindowOperatorTest extends TestLogger {
 
 		expected.add(new Watermark(4998));
 
-		// this will not be dropped because the session we're adding two has maxTimestamp
+		// this will not be sideoutput because the session we're adding two has maxTimestamp
 		// after the current watermark
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
 
@@ -1832,7 +1886,7 @@ public class WindowOperatorTest extends TestLogger {
 
 		// because of the small allowed lateness and because the trigger is accumulating
 		// this will be merged into the session (11600-14600) and therefore will not
-		// be dropped as late
+		// be sideoutput as late
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
@@ -1843,7 +1897,10 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-2", 10000L, 14600L), 14599));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+		ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
+
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
+		assertEquals(null, sideActual);
 
 		testHarness.processWatermark(new Watermark(20000));
 
@@ -1855,12 +1912,15 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new Watermark(100000));
 
 		actual = testHarness.getOutput();
+		sideActual = testHarness.getSideOutput(lateOutputTag);
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
+		assertEquals(null, sideActual);
+
 		testHarness.close();
 	}
 
 	@Test
-	public void testDropDueToLatenessSessionWithHugeLatenessPurgingTrigger() throws Exception {
+	public void testNotSideOutputDueToLatenessSessionWithHugeLatenessPurgingTrigger() throws Exception {
 
 		final int GAP_SIZE = 3;
 		final long LATENESS = 10000;
@@ -1880,7 +1940,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
 				PurgingTrigger.of(EventTimeTrigger.create()),
-				LATENESS);
+				LATENESS,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1899,7 +1960,7 @@ public class WindowOperatorTest extends TestLogger {
 
 		expected.add(new Watermark(4998));
 
-		// this will not be dropped because the session we're adding two has maxTimestamp
+		// this will not be sideoutput because the session we're adding two has maxTimestamp
 		// after the current watermark
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
 
@@ -1928,7 +1989,9 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 14600L), 14599));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+		ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
+		assertEquals(null, sideActual);
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
 		testHarness.processWatermark(new Watermark(20000));
@@ -1941,12 +2004,15 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new Watermark(100000));
 
 		actual = testHarness.getOutput();
+		sideActual = testHarness.getSideOutput(lateOutputTag);
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
+		assertEquals(null, sideActual);
+
 		testHarness.close();
 	}
 
 	@Test
-	public void testDropDueToLatenessSessionWithHugeLateness() throws Exception {
+	public void testNotSideOutputDueToLatenessSessionWithHugeLateness() throws Exception {
 		final int GAP_SIZE = 3;
 		final long LATENESS = 10000;
 
@@ -1965,7 +2031,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
 				EventTimeTrigger.create(),
-				LATENESS);
+				LATENESS,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1984,7 +2051,7 @@ public class WindowOperatorTest extends TestLogger {
 
 		expected.add(new Watermark(4998));
 
-		// this will not be dropped because the session we're adding two has maxTimestamp
+		// this will not be sideoutput because the session we're adding two has maxTimestamp
 		// after the current watermark
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
 
@@ -2015,7 +2082,9 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-7", 1000L, 14600L), 14599));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+		ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
+		assertEquals(null, sideActual);
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
 		testHarness.processWatermark(new Watermark(20000));
@@ -2027,7 +2096,11 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new Watermark(100000));
 
 		actual = testHarness.getOutput();
+		sideActual = testHarness.getSideOutput(lateOutputTag);
+
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
+		assertEquals(null, sideActual);
+
 		testHarness.close();
 	}
 
@@ -2050,7 +2123,8 @@ public class WindowOperatorTest extends TestLogger {
 				windowStateDesc,
 				new InternalIterableWindowFunction<>(new PassThroughFunction2()),
 					new EventTimeTriggerAccumGC(LATENESS),
-				LATENESS);
+				LATENESS,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -2104,7 +2178,8 @@ public class WindowOperatorTest extends TestLogger {
 				windowStateDesc,
 				new InternalIterableWindowFunction<>(new PassThroughFunction()),
 				EventTimeTrigger.create(),
-				LATENESS);
+				LATENESS,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -2150,7 +2225,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				EventTimeTrigger.create(),
-				LATENESS);
+				LATENESS,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -2207,7 +2283,8 @@ public class WindowOperatorTest extends TestLogger {
 				windowStateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughFunction()),
 				EventTimeTrigger.create(),
-				LATENESS);
+				LATENESS,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -2252,7 +2329,8 @@ public class WindowOperatorTest extends TestLogger {
 				windowStateDesc,
 				new InternalIterableWindowFunction<>(new PassThroughFunction()),
 				EventTimeTrigger.create(),
-				LATENESS);
+				LATENESS,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -2296,7 +2374,8 @@ public class WindowOperatorTest extends TestLogger {
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
 				EventTimeTrigger.create(),
-				LATENESS);
+				LATENESS,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -2351,7 +2430,8 @@ public class WindowOperatorTest extends TestLogger {
 				windowStateDesc,
 				new InternalSingleValueWindowFunction<>(new PassThroughFunction()),
 				EventTimeTrigger.create(),
-				LATENESS);
+				LATENESS,
+				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);

http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 694353c..757e45f 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.Collector
+import org.apache.flink.util.{Collector, OutputTag}
 import org.apache.flink.util.Preconditions.checkNotNull
 
 /**
@@ -72,6 +72,20 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
   }
 
   /**
+   * Send late arriving data to the side output identified by the given [[OutputTag]]. Data
+   * is considered late after the watermark has passed the end of the window plus the allowed
+   * lateness set using [[allowedLateness(Time)]].
+   *
+   * You can get the stream of late data using [[DataStream.getSideOutput()]] on the [[DataStream]]
+   * resulting from the windowed operation with the same [[OutputTag]].
+   */
+  @PublicEvolving
+  def sideOutputLateData(outputTag: OutputTag[T]): AllWindowedStream[T, W] = {
+    javaStream.sideOutputLateData(outputTag)
+    this
+  }
+
+  /**
    * Sets the [[Trigger]] that should be used to trigger window emission.
    */
   @PublicEvolving

http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 32a9f60..4e0e1a4 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -19,8 +19,6 @@
 package org.apache.flink.streaming.api.scala
 
 import org.apache.flink.annotation.{Public, PublicEvolving}
-import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
-import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.functions.{AggregateFunction, FoldFunction, ReduceFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream}
@@ -32,7 +30,7 @@ import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.Collector
+import org.apache.flink.util.{Collector, OutputTag}
 
 /**
  * A [[WindowedStream]] represents a data stream where elements are grouped by
@@ -76,6 +74,20 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
   }
 
   /**
+   * Send late arriving data to the side output identified by the given [[OutputTag]]. Data
+   * is considered late after the watermark has passed the end of the window plus the allowed
+   * lateness set using [[allowedLateness(Time)]].
+   *
+   * You can get the stream of late data using [[DataStream.getSideOutput()]] on the [[DataStream]]
+   * resulting from the windowed operation with the same [[OutputTag]].
+   */
+  @PublicEvolving
+  def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {
+    javaStream.sideOutputLateData(outputTag)
+    this
+  }
+
+  /**
    * Sets the [[Trigger]] that should be used to trigger window emission.
    */
   @PublicEvolving

http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
new file mode 100644
index 0000000..2f92897
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Integration test for streaming programs using side outputs.
+ */
+public class SideOutputITCase extends StreamingMultipleProgramsTestBase {
+
+	static List<Integer> elements = new ArrayList<>();
+	static {
+		elements.add(1);
+		elements.add(2);
+		elements.add(5);
+		elements.add(3);
+		elements.add(4);
+	}
+
+	private static class TestWatermarkAssigner implements AssignerWithPunctuatedWatermarks<Integer> {
+		private static final long serialVersionUID = 1L;
+
+		@Nullable
+		@Override
+		public Watermark checkAndGetNextWatermark(Integer lastElement, long extractedTimestamp) {
+			return new Watermark(extractedTimestamp);
+		}
+
+		@Override
+		public long extractTimestamp(Integer element, long previousElementTimestamp) {
+			return Long.valueOf(element);
+		}
+	}
+
+	private static class TestKeySelector implements KeySelector<Integer, Integer> {
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public Integer getKey(Integer value) throws Exception {
+			return value;
+		}
+	}
+
+	/**
+	 * Test window late arriving events stream
+	 */
+	@Test
+	public void testAllWindowLateArrivingEvents() throws Exception {
+		TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(1);
+		see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		DataStream<Integer> dataStream = see.fromCollection(elements);
+
+		OutputTag<Integer> lateDataTag = new OutputTag<Integer>("late"){};
+
+		SingleOutputStreamOperator<Integer> windowOperator = dataStream
+				.assignTimestampsAndWatermarks(new TestWatermarkAssigner())
+				.timeWindowAll(Time.milliseconds(1), Time.milliseconds(1))
+				.sideOutputLateData(lateDataTag)
+				.apply(new AllWindowFunction<Integer, Integer, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+					
+					@Override
+					public void apply(TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
+							for(Integer val : values) {
+								out.collect(val);
+							}
+					}
+				});
+
+		windowOperator
+				.getSideOutput(lateDataTag)
+				.flatMap(new FlatMapFunction<Integer, String>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void flatMap(Integer value, Collector<String> out) throws Exception {
+						out.collect("late-" + String.valueOf(value));
+					}
+				})
+				.addSink(sideOutputResultSink);
+
+		see.execute();
+		assertEquals(sideOutputResultSink.getSortedResult(), Arrays.asList("late-3", "late-4"));
+
+	}
+
+	@Test
+	public void testKeyedWindowLateArrivingEvents() throws Exception {
+		TestListResultSink<String> resultSink = new TestListResultSink<>();
+		TestListResultSink<Integer> lateResultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(3);
+		see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		DataStream<Integer> dataStream = see.fromCollection(elements);
+
+		OutputTag<Integer> lateDataTag = new OutputTag<Integer>("late"){};
+
+		SingleOutputStreamOperator<String> windowOperator = dataStream
+				.assignTimestampsAndWatermarks(new TestWatermarkAssigner())
+				.keyBy(new TestKeySelector())
+				.timeWindow(Time.milliseconds(1), Time.milliseconds(1))
+				.allowedLateness(Time.milliseconds(2))
+				.sideOutputLateData(lateDataTag)
+				.apply(new WindowFunction<Integer, String, Integer, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(Integer key, TimeWindow window, Iterable<Integer> input, Collector<String> out) throws Exception {
+						for(Integer val : input) {
+							out.collect(String.valueOf(key) + "-" + String.valueOf(val));
+						}
+					}
+				});
+
+		windowOperator
+				.addSink(resultSink);
+
+		windowOperator
+				.getSideOutput(lateDataTag)
+				.addSink(lateResultSink);
+
+		see.execute();
+		assertEquals(Arrays.asList("1-1", "2-2", "4-4", "5-5"), resultSink.getSortedResult());
+		assertEquals(Collections.singletonList(3), lateResultSink.getSortedResult());
+	}
+
+}


Mime
View raw message