flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [3/4] flink git commit: [FLINK-3174] Add MergingWindowAssigner and SessionWindows
Date Tue, 05 Apr 2016 15:19:17 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index ecad9b2..7c4ef7f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AppendingState;
 import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
@@ -29,6 +30,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.memory.DataInputView;
@@ -39,9 +41,9 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
@@ -53,7 +55,9 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 
@@ -98,7 +102,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 	protected final Trigger<? super IN, ? super W> trigger;
 
-	protected final StateDescriptor<? extends MergingState<IN, ACC>, ?> windowStateDescriptor;
+	protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
 
 	/**
 	 * This is used to copy the incoming element because it can be put into several window
@@ -149,6 +153,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	protected transient Set<Timer<K, W>> watermarkTimers;
 	protected transient PriorityQueue<Timer<K, W>> watermarkTimersQueue;
 
+	protected transient Map<K, MergingWindowSet<W>> mergingWindowsByKey;
+
 	/**
 	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
 	 */
@@ -156,7 +162,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		TypeSerializer<W> windowSerializer,
 		KeySelector<IN, K> keySelector,
 		TypeSerializer<K> keySerializer,
-		StateDescriptor<? extends MergingState<IN, ACC>, ?> windowStateDescriptor,
+		StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
 		InternalWindowFunction<ACC, OUT, K, W> windowFunction,
 		Trigger<? super IN, ? super W> trigger) {
 
@@ -184,6 +190,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
 	public final void open() throws Exception {
 		super.open();
@@ -205,6 +212,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		}
 
 		context = new Context(null, null);
+
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			mergingWindowsByKey = new HashMap<>();
+		}
 	}
 
 	@Override
@@ -219,52 +230,117 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 		K key = (K) getStateBackend().getCurrentKey();
 
-		for (W window: elementWindows) {
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get(getStateBackend().getCurrentKey());
+			if (mergingWindows == null) {
+				mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner);
+				mergingWindowsByKey.put(key, mergingWindows);
+			}
+
+
+			for (W window: elementWindows) {
+				// If there is a merge, it can only result in a window that contains our new
+				// element because we always eagerly merge
+				final Tuple1<TriggerResult> mergeTriggerResult = new Tuple1<>(TriggerResult.CONTINUE);
+
+
+				// adding the new window might result in a merge, in that case the actualWindow
+				// is the merged window and we work with that. If we don't merge then
+				// actualWindow == window
+				W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
+					@Override
+					public void merge(W mergeResult,
+							Collection<W> mergedWindows, W stateWindowResult,
+							Collection<W> mergedStateWindows) throws Exception {
+						context.window = mergeResult;
+
+						// store for later use
+						mergeTriggerResult.f0 = context.onMerge(mergedWindows);
 
-			MergingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer,
-				windowStateDescriptor);
+						for (W m: mergedWindows) {
+							context.window = m;
+							context.clear();
+						}
 
-			windowState.add(element.getValue());
+						// merge the merged state windows into the newly resulting state window
+						getStateBackend().mergePartitionedStates(stateWindowResult,
+								mergedStateWindows,
+								windowSerializer,
+								(StateDescriptor<? extends MergingState<?,?>, ?>) windowStateDescriptor);
+					}
+				});
 
-			context.key = key;
-			context.window = window;
-			TriggerResult triggerResult = context.onElement(element);
+				W stateWindow = mergingWindows.getStateWindow(actualWindow);
+				AppendingState<IN, ACC> windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+				windowState.add(element.getValue());
 
-			processTriggerResult(triggerResult, key, window);
+				context.key = key;
+				context.window = actualWindow;
+
+				// we might have already fired because of a merge but still call onElement
+				// on the (possibly merged) window
+				TriggerResult triggerResult = context.onElement(element);
+
+				TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0);
+
+				processTriggerResult(combinedTriggerResult, key, actualWindow);
+			}
+
+		} else {
+			for (W window: elementWindows) {
+
+				AppendingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer,
+						windowStateDescriptor);
+
+				windowState.add(element.getValue());
+
+				context.key = key;
+				context.window = window;
+				TriggerResult triggerResult = context.onElement(element);
+
+				processTriggerResult(triggerResult, key, window);
+			}
 		}
 	}
 
+	@SuppressWarnings("unchecked")
 	protected void processTriggerResult(TriggerResult triggerResult, K key, W window) throws Exception {
 		if (!triggerResult.isFire() && !triggerResult.isPurge()) {
 			// do nothing
 			return;
 		}
 
-		if (triggerResult.isFire()) {
-			timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
+		AppendingState<IN, ACC> windowState;
+
+		MergingWindowSet<W> mergingWindows = null;
+
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			mergingWindows = mergingWindowsByKey.get(key);
+			W stateWindow = mergingWindows.getStateWindow(window);
+			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
 
-			MergingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer,
-				windowStateDescriptor);
+		} else {
+			windowState = getPartitionedState(window, windowSerializer, windowStateDescriptor);
+		}
 
+		if (triggerResult.isFire()) {
+			timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
 			ACC contents = windowState.get();
 
 			userFunction.apply(context.key, context.window, contents, timestampedCollector);
 
-			if (triggerResult.isPurge()) {
-				windowState.clear();
-				context.clear();
-			}
-		} else if (triggerResult.isPurge()) {
-			MergingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer,
-				windowStateDescriptor);
+		}
+		if (triggerResult.isPurge()) {
 			windowState.clear();
+			if (mergingWindows != null) {
+				mergingWindows.retireWindow(window);
+			}
 			context.clear();
 		}
 	}
 
 	@Override
 	public final void processWatermark(Watermark mark) throws Exception {
-
 		boolean fire;
 
 		do {
@@ -323,10 +399,12 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 * by setting the {@code key} and {@code window} fields. No internal state must be kept in
 	 * the {@code Context}
 	 */
-	protected class Context implements TriggerContext {
+	public class Context implements Trigger.OnMergeContext {
 		protected K key;
 		protected W window;
 
+		protected Collection<W> mergedWindows;
+
 		public Context(K key, W window) {
 			this.key = key;
 			this.window = window;
@@ -377,6 +455,20 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		}
 
 		@Override
+		public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
+			if (mergedWindows != null && mergedWindows.size() > 0) {
+				try {
+					WindowOperator.this.getStateBackend().mergePartitionedStates(window,
+							mergedWindows,
+							windowSerializer,
+							stateDescriptor);
+				} catch (Exception e) {
+					throw new RuntimeException("Error while merging state.", e);
+				}
+			}
+		}
+
+		@Override
 		public void registerProcessingTimeTimer(long time) {
 			Timer<K, W> timer = new Timer<>(time, key, window);
 			if (processingTimeTimers.add(timer)) {
@@ -428,6 +520,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			return trigger.onEventTime(time, window, this);
 		}
 
+		public TriggerResult onMerge(Collection<W> mergedWindows) throws Exception {
+			this.mergedWindows = mergedWindows;
+			return trigger.onMerge(window, this);
+		}
+
 		public void clear() throws Exception {
 			trigger.clear(window, this);
 		}
@@ -580,7 +677,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	}
 
 	@VisibleForTesting
-	public StateDescriptor<? extends MergingState<IN, ACC>, ?> getStateDescriptor() {
+	public StateDescriptor<? extends AppendingState<IN, ACC>, ?> getStateDescriptor() {
 		return windowStateDescriptor;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 84c83b9..4c4ed8a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -24,11 +24,14 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.AllWindowedStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
@@ -36,6 +39,8 @@ import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
@@ -44,6 +49,8 @@ import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.fail;
+
 /**
  * These tests verify that the api calls on
  * {@link org.apache.flink.streaming.api.datastream.AllWindowedStream} instantiate
@@ -235,6 +242,83 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
 	}
 
+	@Test
+	public void testSessionWithFold() throws Exception {
+		// verify that fold does not work with merging windows
+
+		StreamExecutionEnvironment env = LocalStreamEnvironment.createLocalEnvironment();
+
+		AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
+				.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
+
+		try {
+			windowedStream.fold("", new FoldFunction<String, String>() {
+				@Override
+				public String fold(String accumulator, String value) throws Exception {
+					return accumulator;
+				}
+			});
+		} catch (UnsupportedOperationException e) {
+			// expected
+			// use a catch to ensure that the exception is thrown by the fold
+			return;
+		}
+
+		fail("The fold call should fail.");
+
+		env.execute();
+	}
+
+	@Test
+	public void testMergingAssignerWithNonMergingTrigger() throws Exception {
+		// verify that we check for trigger compatibility
+
+		StreamExecutionEnvironment env = LocalStreamEnvironment.createLocalEnvironment();
+
+		AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
+				.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
+
+		try {
+			windowedStream.trigger(new Trigger<String, TimeWindow>() {
+				@Override
+				public TriggerResult onElement(String element,
+						long timestamp,
+						TimeWindow window,
+						TriggerContext ctx) throws Exception {
+					return null;
+				}
+
+				@Override
+				public TriggerResult onProcessingTime(long time,
+						TimeWindow window,
+						TriggerContext ctx) throws Exception {
+					return null;
+				}
+
+				@Override
+				public TriggerResult onEventTime(long time,
+						TimeWindow window,
+						TriggerContext ctx) throws Exception {
+					return null;
+				}
+
+				@Override
+				public boolean canMerge() {
+					return false;
+				}
+			});
+		} catch (UnsupportedOperationException e) {
+			// expected
+			// use a catch to ensure that the exception is thrown by the fold
+			return;
+		}
+
+		fail("The trigger call should fail.");
+
+		env.execute();
+
+	}
+
 	// ------------------------------------------------------------------------
 	//  UDFs
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
new file mode 100644
index 0000000..cf90f8a
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNot.not;
+import static org.junit.Assert.*;
+
+/**
+ * Tests for verifying that {@link MergingWindowSet} correctly merges windows in various situations
+ * and that the merge callback is called with the correct sets of windows.
+ */
+public class MergingWindowSetTest {
+
+	@Test
+	public void testIncrementalMerging() throws Exception {
+		MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));
+
+		TestingMergeFunction mergeFunction = new TestingMergeFunction();
+
+		// add initial window
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(0, 4), windowSet.addWindow(new TimeWindow(0, 4), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+
+		assertTrue(windowSet.getStateWindow(new TimeWindow(0, 4)).equals(new TimeWindow(0, 4)));
+
+		// add some more windows
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(0, 4), windowSet.addWindow(new TimeWindow(0, 4), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(0, 5), windowSet.addWindow(new TimeWindow(3, 5), mergeFunction));
+		assertTrue(mergeFunction.hasMerged());
+		assertEquals(new TimeWindow(0, 5), mergeFunction.mergeTarget());
+		assertEquals(new TimeWindow(0, 4), mergeFunction.stateWindow());
+		assertThat(mergeFunction.mergeSources(), containsInAnyOrder(new TimeWindow(0, 4)));
+		assertTrue(mergeFunction.mergedStateWindows().isEmpty());
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(0, 6), windowSet.addWindow(new TimeWindow(4, 6), mergeFunction));
+		assertTrue(mergeFunction.hasMerged());
+		assertEquals(new TimeWindow(0, 6), mergeFunction.mergeTarget());
+		assertEquals(new TimeWindow(0, 4), mergeFunction.stateWindow());
+		assertThat(mergeFunction.mergeSources(), containsInAnyOrder(new TimeWindow(0, 5)));
+		assertTrue(mergeFunction.mergedStateWindows().isEmpty());
+
+		assertEquals(new TimeWindow(0, 4), windowSet.getStateWindow(new TimeWindow(0, 6)));
+
+		// add some windows that falls into the already merged region
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(0, 6), windowSet.addWindow(new TimeWindow(1, 4), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(0, 6), windowSet.addWindow(new TimeWindow(0, 4), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(0, 6), windowSet.addWindow(new TimeWindow(3, 5), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(0, 6), windowSet.addWindow(new TimeWindow(4, 6), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+
+		assertEquals(new TimeWindow(0, 4), windowSet.getStateWindow(new TimeWindow(0, 6)));
+
+		// add some more windows that don't merge with the first bunch
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(11, 14), windowSet.addWindow(new TimeWindow(11, 14), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+
+
+		assertEquals(new TimeWindow(0, 4), windowSet.getStateWindow(new TimeWindow(0, 6)));
+
+		assertEquals(new TimeWindow(11, 14), windowSet.getStateWindow(new TimeWindow(11, 14)));
+
+		// add some more windows that merge with the second bunch
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(10, 14), windowSet.addWindow(new TimeWindow(10, 13), mergeFunction));
+		assertTrue(mergeFunction.hasMerged());
+		assertEquals(new TimeWindow(10, 14), mergeFunction.mergeTarget());
+		assertEquals(new TimeWindow(11, 14), mergeFunction.stateWindow());
+		assertThat(mergeFunction.mergeSources(), containsInAnyOrder(new TimeWindow(11, 14)));
+		assertTrue(mergeFunction.mergedStateWindows().isEmpty());
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(10, 15), windowSet.addWindow(new TimeWindow(12, 15), mergeFunction));
+		assertTrue(mergeFunction.hasMerged());
+		assertEquals(new TimeWindow(10, 15), mergeFunction.mergeTarget());
+		assertEquals(new TimeWindow(11, 14), mergeFunction.stateWindow());
+		assertThat(mergeFunction.mergeSources(), containsInAnyOrder(new TimeWindow(10, 14)));
+		assertTrue(mergeFunction.mergedStateWindows().isEmpty());
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(10, 15), windowSet.addWindow(new TimeWindow(11, 14), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+
+
+		assertEquals(new TimeWindow(0, 4), windowSet.getStateWindow(new TimeWindow(0, 6)));
+
+		assertEquals(new TimeWindow(11, 14), windowSet.getStateWindow(new TimeWindow(10, 15)));
+
+		// retire the first batch of windows
+		windowSet.retireWindow(new TimeWindow(0, 6));
+
+		try {
+			windowSet.getStateWindow(new TimeWindow(0, 6));
+			fail("Expected exception");
+		} catch (IllegalStateException e) {
+			//ignore
+		}
+
+		assertTrue(windowSet.getStateWindow(new TimeWindow(10, 15)).equals(new TimeWindow(11, 14)));
+	}
+
+	@Test
+	public void testLateMerging() throws Exception {
+		MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));
+
+		TestingMergeFunction mergeFunction = new TestingMergeFunction();
+
+		// add several non-overlapping initial windoww
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(0, 3), windowSet.addWindow(new TimeWindow(0, 3), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+		assertEquals(new TimeWindow(0, 3), windowSet.getStateWindow(new TimeWindow(0, 3)));
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(5, 8), windowSet.addWindow(new TimeWindow(5, 8), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+		assertEquals(new TimeWindow(5, 8), windowSet.getStateWindow(new TimeWindow(5, 8)));
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(10, 13), windowSet.addWindow(new TimeWindow(10, 13), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+		assertEquals(new TimeWindow(10, 13), windowSet.getStateWindow(new TimeWindow(10, 13)));
+
+		// add a window that merges the later two windows
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(5, 13), windowSet.addWindow(new TimeWindow(8, 10), mergeFunction));
+		assertTrue(mergeFunction.hasMerged());
+		assertEquals(new TimeWindow(5, 13), mergeFunction.mergeTarget());
+		assertThat(mergeFunction.stateWindow(), anyOf(is(new TimeWindow(5, 8)), is(new TimeWindow(10, 13))));
+		assertThat(mergeFunction.mergeSources(), containsInAnyOrder(new TimeWindow(5, 8), new TimeWindow(10, 13)));
+		assertThat(mergeFunction.mergedStateWindows(), anyOf(
+				containsInAnyOrder(new TimeWindow(10, 13)),
+				containsInAnyOrder(new TimeWindow(5, 8))));
+		assertThat(mergeFunction.mergedStateWindows(), not(hasItem(mergeFunction.mergeTarget())));
+
+		assertEquals(new TimeWindow(0, 3), windowSet.getStateWindow(new TimeWindow(0, 3)));
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(5, 13), windowSet.addWindow(new TimeWindow(5, 8), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(5, 13), windowSet.addWindow(new TimeWindow(8, 10), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(5, 13), windowSet.addWindow(new TimeWindow(10, 13), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+
+		assertThat(windowSet.getStateWindow(new TimeWindow(5, 13)), anyOf(is(new TimeWindow(5, 8)), is(new TimeWindow(10, 13))));
+
+		// add a window that merges all of them together
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(0, 13), windowSet.addWindow(new TimeWindow(3, 5), mergeFunction));
+		assertTrue(mergeFunction.hasMerged());
+		assertEquals(new TimeWindow(0, 13), mergeFunction.mergeTarget());
+		assertThat(mergeFunction.stateWindow(), anyOf(is(new TimeWindow(0, 3)), is(new TimeWindow(5, 8)), is(new TimeWindow(10, 13))));
+		assertThat(mergeFunction.mergeSources(), containsInAnyOrder(new TimeWindow(0, 3), new TimeWindow(5, 13)));
+		assertThat(mergeFunction.mergedStateWindows(), anyOf(
+				containsInAnyOrder(new TimeWindow(0, 3)),
+				containsInAnyOrder(new TimeWindow(5, 8)),
+				containsInAnyOrder(new TimeWindow(10, 13))));
+		assertThat(mergeFunction.mergedStateWindows(), not(hasItem(mergeFunction.mergeTarget())));
+
+		assertThat(windowSet.getStateWindow(new TimeWindow(0, 13)), anyOf(is(new TimeWindow(0, 3)), is(new TimeWindow(5, 8)), is(new TimeWindow(10, 13))));
+	}
+
+	private static class TestingMergeFunction implements MergingWindowSet.MergeFunction<TimeWindow> {
+		private TimeWindow target = null;
+		private Collection<TimeWindow> sources = null;
+
+		private TimeWindow stateWindow = null;
+		private Collection<TimeWindow> mergedStateWindows = null;
+
+		public void reset() {
+			target = null;
+			sources = null;
+			stateWindow = null;
+			mergedStateWindows = null;
+		}
+
+		public boolean hasMerged() {
+			return target != null;
+		}
+
+		public TimeWindow mergeTarget() {
+			return target;
+		}
+
+		public Collection<TimeWindow> mergeSources() {
+			return sources;
+		}
+
+		public TimeWindow stateWindow() {
+			return stateWindow;
+		}
+
+		public Collection<TimeWindow> mergedStateWindows() {
+			return mergedStateWindows;
+		}
+
+		@Override
+		public void merge(TimeWindow mergeResult,
+				Collection<TimeWindow> mergedWindows,
+				TimeWindow stateWindowResult,
+				Collection<TimeWindow> mergedStateWindows) throws Exception {
+			if (target != null) {
+				fail("More than one merge for adding a Window should not occur.");
+			}
+			this.stateWindow = stateWindowResult;
+			this.target = mergeResult;
+			this.mergedStateWindows = mergedStateWindows;
+			this.sources = mergedWindows;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 3277940..642a16b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -25,14 +26,21 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.WindowedStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
@@ -50,11 +58,15 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.junit.Assert.fail;
+
 public class WindowOperatorTest {
 
 	// For counting if close() is called the correct number of times on the SumReducer
@@ -82,35 +94,35 @@ public class WindowOperatorTest {
 		testHarness.processWatermark(new Watermark(initialTime + 999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 999));
 		expectedOutput.add(new Watermark(999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 
 		testHarness.processWatermark(new Watermark(initialTime + 1999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 1999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 1999));
 		expectedOutput.add(new Watermark(1999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 		testHarness.processWatermark(new Watermark(initialTime + 2999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2999));
 		expectedOutput.add(new Watermark(2999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 		testHarness.processWatermark(new Watermark(initialTime + 3999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 3999));
 		expectedOutput.add(new Watermark(3999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 		testHarness.processWatermark(new Watermark(initialTime + 4999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 4999));
 		expectedOutput.add(new Watermark(4999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 		testHarness.processWatermark(new Watermark(initialTime + 5999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 5999));
 		expectedOutput.add(new Watermark(5999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 
 		// those don't have any effect...
@@ -119,7 +131,7 @@ public class WindowOperatorTest {
 		expectedOutput.add(new Watermark(6999));
 		expectedOutput.add(new Watermark(7999));
 
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 	}
 
 	@Test
@@ -218,31 +230,31 @@ public class WindowOperatorTest {
 
 		testHarness.processWatermark(new Watermark(initialTime + 999));
 		expectedOutput.add(new Watermark(999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 
 		testHarness.processWatermark(new Watermark(initialTime + 1999));
 		expectedOutput.add(new Watermark(1999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 		testHarness.processWatermark(new Watermark(initialTime + 2999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2999));
 		expectedOutput.add(new Watermark(2999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 		testHarness.processWatermark(new Watermark(initialTime + 3999));
 		expectedOutput.add(new Watermark(3999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 		testHarness.processWatermark(new Watermark(initialTime + 4999));
 		expectedOutput.add(new Watermark(4999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 		testHarness.processWatermark(new Watermark(initialTime + 5999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 5999));
 		expectedOutput.add(new Watermark(5999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 
 		// those don't have any effect...
@@ -251,7 +263,7 @@ public class WindowOperatorTest {
 		expectedOutput.add(new Watermark(6999));
 		expectedOutput.add(new Watermark(7999));
 
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 	}
 
 		@Test
@@ -329,6 +341,294 @@ public class WindowOperatorTest {
 
 	@Test
 	@SuppressWarnings("unchecked")
+	public void testSessionWindows() throws Exception {
+		closeCalled.set(0);
+
+		final int SESSION_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
+				inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalIterableWindowFunction<>(new SessionWindowFunction()),
+				EventTimeTrigger.create());
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 5501));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 6050));
+
+		testHarness.processWatermark(new Watermark(initialTime + 12000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), initialTime + 5499));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), initialTime + 5499));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), initialTime + 9049));
+		expectedOutput.add(new Watermark(initialTime + 12000));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 15000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), initialTime + 15000));
+
+		testHarness.processWatermark(new Watermark(initialTime + 17999));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), initialTime + 17999));
+		expectedOutput.add(new Watermark(initialTime + 17999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+		testHarness.close();
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testReduceSessionWindows() throws Exception {
+		closeCalled.set(0);
+
+		final int SESSION_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>(
+				"window-contents", new SumReducer(), inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
+				EventTimeTrigger.create());
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 5501));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 6050));
+
+		testHarness.processWatermark(new Watermark(initialTime + 12000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), initialTime + 5499));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), initialTime + 5499));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), initialTime + 9049));
+		expectedOutput.add(new Watermark(initialTime + 12000));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 15000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), initialTime + 15000));
+
+		testHarness.processWatermark(new Watermark(initialTime + 17999));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), initialTime + 17999));
+		expectedOutput.add(new Watermark(initialTime + 17999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+		testHarness.close();
+	}
+
+	/**
+	 * This tests whether merging works correctly with the CountTrigger.
+	 * @throws Exception
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testSessionWindowsWithCountTrigger() throws Exception {
+		closeCalled.set(0);
+
+		final int SESSION_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
+				inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalIterableWindowFunction<>(new SessionWindowFunction()),
+				PurgingTrigger.of(CountTrigger.of(4)));
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 3500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 6500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 7000));
+
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 0L, 6500L), initialTime + 6499));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+		// add an element that merges the two "key1" sessions, they should now have count 6, and therfore fire
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 10), initialTime + 4500));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 10L, 10000L), initialTime + 9999L));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testMergeAndEvictor() throws Exception {
+		// verify that merging WindowAssigner and Evictor cannot be used together
+
+		StreamExecutionEnvironment env = LocalStreamEnvironment.createLocalEnvironment();
+
+		WindowedStream<String, String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
+				.keyBy(new KeySelector<String, String>() {
+					@Override
+					public String getKey(String value) throws Exception {
+						return value;
+					}
+				})
+				.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
+
+		try {
+			windowedStream.evictor(CountEvictor.of(13));
+
+		} catch (UnsupportedOperationException e) {
+			// expected
+			// use a catch to ensure that the exception is thrown by the fold
+			return;
+		}
+
+		fail("The evictor call should fail.");
+
+		env.execute();
+
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	/**
+	 * This tests a custom Session window assigner that assigns some elements to "point windows",
+	 * windows that have the same timestamp for start and end.
+	 *
+	 * <p> In this test, elements that have 33 as the second tuple field will be put into a point
+	 * window.
+	 */
+	public void testPointSessions() throws Exception {
+		closeCalled.set(0);
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
+				inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+				new PointSessionWindows(3000),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalIterableWindowFunction<>(new SessionWindowFunction()),
+				EventTimeTrigger.create());
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), initialTime + 2500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 33), initialTime + 2500));
+
+		testHarness.processWatermark(new Watermark(initialTime + 12000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-36", 10L, 4000L), initialTime + 3999));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-67", 0L, 3000L), initialTime + 2999));
+		expectedOutput.add(new Watermark(initialTime + 12000));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+		testHarness.close();
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
 	public void testContinuousWatermarkTrigger() throws Exception {
 		closeCalled.set(0);
 
@@ -379,31 +679,31 @@ public class WindowOperatorTest {
 
 		testHarness.processWatermark(new Watermark(initialTime + 1000));
 		expectedOutput.add(new Watermark(1000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 
 		testHarness.processWatermark(new Watermark(initialTime + 2000));
 		expectedOutput.add(new Watermark(2000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 		testHarness.processWatermark(new Watermark(initialTime + 3000));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE));
 		expectedOutput.add(new Watermark(3000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 		testHarness.processWatermark(new Watermark(initialTime + 4000));
 		expectedOutput.add(new Watermark(4000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 		testHarness.processWatermark(new Watermark(initialTime + 5000));
 		expectedOutput.add(new Watermark(5000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 		testHarness.processWatermark(new Watermark(initialTime + 6000));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
 		expectedOutput.add(new Watermark(6000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 
 		// those don't have any effect...
@@ -412,7 +712,7 @@ public class WindowOperatorTest {
 		expectedOutput.add(new Watermark(7000));
 		expectedOutput.add(new Watermark(8000));
 
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 		testHarness.close();
 	}
@@ -469,7 +769,7 @@ public class WindowOperatorTest {
 
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
 
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
 
@@ -480,7 +780,7 @@ public class WindowOperatorTest {
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
 
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 		testHarness.close();
 	}
@@ -523,7 +823,7 @@ public class WindowOperatorTest {
 			Collector<Tuple2<String, Integer>> out) throws Exception {
 
 			if (!openCalled) {
-				Assert.fail("Open was not called");
+				fail("Open was not called");
 			}
 			int sum = 0;
 
@@ -537,7 +837,7 @@ public class WindowOperatorTest {
 	}
 
 	@SuppressWarnings("unchecked")
-	private static class ResultSortComparator implements Comparator<Object> {
+	private static class Tuple2ResultSortComparator implements Comparator<Object> {
 		@Override
 		public int compare(Object o1, Object o2) {
 			if (o1 instanceof Watermark || o2 instanceof Watermark) {
@@ -558,6 +858,32 @@ public class WindowOperatorTest {
 		}
 	}
 
+	@SuppressWarnings("unchecked")
+	private static class Tuple3ResultSortComparator implements Comparator<Object> {
+		@Override
+		public int compare(Object o1, Object o2) {
+			if (o1 instanceof Watermark || o2 instanceof Watermark) {
+				return 0;
+			} else {
+				StreamRecord<Tuple3<String, Long, Long>> sr0 = (StreamRecord<Tuple3<String, Long, Long>>) o1;
+				StreamRecord<Tuple3<String, Long, Long>> sr1 = (StreamRecord<Tuple3<String, Long, Long>>) o2;
+				if (sr0.getTimestamp() != sr1.getTimestamp()) {
+					return (int) (sr0.getTimestamp() - sr1.getTimestamp());
+				}
+				int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
+				if (comparison != 0) {
+					return comparison;
+				} else {
+					comparison = (int) (sr0.getValue().f1 - sr1.getValue().f1);
+					if (comparison != 0) {
+						return comparison;
+					}
+					return (int) (sr0.getValue().f1 - sr1.getValue().f1);
+				}
+			}
+		}
+	}
+
 	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
 		private static final long serialVersionUID = 1L;
 
@@ -566,4 +892,57 @@ public class WindowOperatorTest {
 			return value.f0;
 		}
 	}
+
+	public static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void apply(String key,
+				TimeWindow window,
+				Iterable<Tuple2<String, Integer>> values,
+				Collector<Tuple3<String, Long, Long>> out) throws Exception {
+			int sum = 0;
+			for (Tuple2<String, Integer> i: values) {
+				sum += i.f1;
+			}
+			String resultString = key + "-" + sum;
+			out.collect(new Tuple3<>(resultString, window.getStart(), window.getEnd()));
+		}
+	}
+
+	public static class ReducedSessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void apply(String key,
+				TimeWindow window,
+				Iterable<Tuple2<String, Integer>> values,
+				Collector<Tuple3<String, Long, Long>> out) throws Exception {
+			for (Tuple2<String, Integer> val: values) {
+				out.collect(new Tuple3<>(key + "-" + val.f1, window.getStart(), window.getEnd()));
+			}
+		}
+	}
+
+
+	public static class PointSessionWindows extends EventTimeSessionWindows {
+		private static final long serialVersionUID = 1L;
+
+
+		private PointSessionWindows(long sessionTimeout) {
+			super(sessionTimeout);
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+			if (element instanceof Tuple2) {
+				Tuple2<String, Integer> t2 = (Tuple2<String, Integer>) element;
+				if (t2.f1 == 33) {
+					return Collections.singletonList(new TimeWindow(timestamp, timestamp));
+				}
+			}
+			return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index cacfc26..cb3801e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -17,19 +17,23 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
@@ -37,6 +41,8 @@ import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
@@ -45,6 +51,8 @@ import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.fail;
+
 /**
  * These tests verify that the api calls on
  * {@link WindowedStream} instantiate
@@ -225,6 +233,96 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
 	}
 
+	@Test
+	public void testSessionWithFold() throws Exception {
+		// verify that fold does not work with merging windows
+
+		StreamExecutionEnvironment env = LocalStreamEnvironment.createLocalEnvironment();
+
+		WindowedStream<String, String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
+				.keyBy(new KeySelector<String, String>() {
+					@Override
+					public String getKey(String value) throws Exception {
+						return value;
+					}
+				})
+				.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
+
+		try {
+			windowedStream.fold("", new FoldFunction<String, String>() {
+				@Override
+				public String fold(String accumulator, String value) throws Exception {
+					return accumulator;
+				}
+			});
+		} catch (UnsupportedOperationException e) {
+			// expected
+			// use a catch to ensure that the exception is thrown by the fold
+			return;
+		}
+
+		fail("The fold call should fail.");
+
+		env.execute();
+	}
+
+	@Test
+	public void testMergingAssignerWithNonMergingTrigger() throws Exception {
+		// verify that we check for trigger compatibility
+
+		StreamExecutionEnvironment env = LocalStreamEnvironment.createLocalEnvironment();
+
+		WindowedStream<String, String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
+				.keyBy(new KeySelector<String, String>() {
+					@Override
+					public String getKey(String value) throws Exception {
+						return value;
+					}
+				})
+				.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
+
+		try {
+			windowedStream.trigger(new Trigger<String, TimeWindow>() {
+				@Override
+				public TriggerResult onElement(String element,
+						long timestamp,
+						TimeWindow window,
+						TriggerContext ctx) throws Exception {
+					return null;
+				}
+
+				@Override
+				public TriggerResult onProcessingTime(long time,
+						TimeWindow window,
+						TriggerContext ctx) throws Exception {
+					return null;
+				}
+
+				@Override
+				public TriggerResult onEventTime(long time,
+						TimeWindow window,
+						TriggerContext ctx) throws Exception {
+					return null;
+				}
+
+				@Override
+				public boolean canMerge() {
+					return false;
+				}
+			});
+		} catch (UnsupportedOperationException e) {
+			// expected
+			// use a catch to ensure that the exception is thrown by the fold
+			return;
+		}
+
+		fail("The trigger call should fail.");
+
+		env.execute();
+	}
+
+
+
 	// ------------------------------------------------------------------------
 	//  UDFs
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index 9d1e674..bc4074f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -29,6 +29,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Utils for working with the various test harnesses.
  */
@@ -75,6 +77,8 @@ public class TestHarnessUtil {
 	 * Compare the two queues containing operator/task output by converting them to an array first.
 	 */
 	public static void assertOutputEqualsSorted(String message, Queue<Object> expected, Queue<Object> actual, Comparator<Object> comparator) {
+		assertEquals(expected.size(), actual.size());
+
 		// first, compare only watermarks, their position should be deterministic
 		Iterator<Object> exIt = expected.iterator();
 		Iterator<Object> actIt = actual.iterator();
@@ -82,7 +86,7 @@ public class TestHarnessUtil {
 			Object nextEx = exIt.next();
 			Object nextAct = actIt.next();
 			if (nextEx instanceof Watermark) {
-				Assert.assertEquals(nextEx, nextAct);
+				assertEquals(nextEx, nextAct);
 			}
 		}
 


Mime
View raw message