flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [3/3] flink git commit: [FLINK-6702] put the CEP tests' harness.close() calls into a finally block
Date Fri, 26 May 2017 12:10:08 GMT
[FLINK-6702] put the CEP tests' harness.close() calls into a finally block

This closes #3978.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38c45f80
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38c45f80
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38c45f80

Branch: refs/heads/master
Commit: 38c45f8052166b93bfeebe7aed88a16c53a9332a
Parents: d5ab636
Author: Nico Kruber <nico@data-artisans.com>
Authored: Wed May 24 15:57:24 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Fri May 26 11:34:24 2017 +0200

----------------------------------------------------------------------
 .../cep/operator/CEPFrom12MigrationTest.java    | 408 +++++++------
 .../cep/operator/CEPMigration11to13Test.java    | 230 +++----
 .../flink/cep/operator/CEPOperatorTest.java     | 608 ++++++++++---------
 .../flink/cep/operator/CEPRescalingTest.java    | 287 +++++----
 4 files changed, 812 insertions(+), 721 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/38c45f80/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
index fb05901..0345192 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -84,22 +84,26 @@ public class CEPFrom12MigrationTest {
 						keySelector,
 						BasicTypeInfo.INT_TYPE_INFO);
 
-		harness.setup();
-		harness.open();
-
-		harness.processElement(new StreamRecord<Event>(startEvent, 1));
-		harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
-		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
-		harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
-		harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
-
-		harness.processWatermark(new Watermark(5));
-
-		// do snapshot and save to file
-		OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
-		OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/cep-migration-after-branching-flink1.2-snapshot");
-
-		harness.close();
+		try {
+			harness.setup();
+			harness.open();
+
+			harness.processElement(new StreamRecord<Event>(startEvent, 1));
+			harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+			harness
+				.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+			harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+			harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
+
+			harness.processWatermark(new Watermark(5));
+
+			// do snapshot and save to file
+			OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+			OperatorSnapshotUtil.writeStateHandle(snapshot,
+				"src/test/resources/cep-migration-after-branching-flink1.2-snapshot");
+		} finally {
+			harness.close();
+		}
 	}
 
 	@Test
@@ -130,95 +134,101 @@ public class CEPFrom12MigrationTest {
 						keySelector,
 						BasicTypeInfo.INT_TYPE_INFO);
 
-		harness.setup();
-		harness.initializeState(
+		try {
+			harness.setup();
+			harness.initializeState(
 				OperatorSnapshotUtil.readStateHandle(
-						OperatorSnapshotUtil.getResourceFilename("cep-migration-after-branching-flink1.2-snapshot")));
-		harness.open();
+					OperatorSnapshotUtil
+						.getResourceFilename("cep-migration-after-branching-flink1.2-snapshot")));
+			harness.open();
 
-		harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
-		harness.processElement(new StreamRecord<>(endEvent, 5));
+			harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+			harness.processElement(new StreamRecord<>(endEvent, 5));
 
-		harness.processWatermark(new Watermark(20));
+			harness.processWatermark(new Watermark(20));
 
-		ConcurrentLinkedQueue<Object> result = harness.getOutput();
+			ConcurrentLinkedQueue<Object> result = harness.getOutput();
 
-		// watermark and 2 results
-		assertEquals(3, result.size());
+			// watermark and 2 results
+			assertEquals(3, result.size());
 
-		Object resultObject1 = result.poll();
-		assertTrue(resultObject1 instanceof StreamRecord);
-		StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
-		assertTrue(resultRecord1.getValue() instanceof Map);
+			Object resultObject1 = result.poll();
+			assertTrue(resultObject1 instanceof StreamRecord);
+			StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+			assertTrue(resultRecord1.getValue() instanceof Map);
 
-		Object resultObject2 = result.poll();
-		assertTrue(resultObject2 instanceof StreamRecord);
-		StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
-		assertTrue(resultRecord2.getValue() instanceof Map);
+			Object resultObject2 = result.poll();
+			assertTrue(resultObject2 instanceof StreamRecord);
+			StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+			assertTrue(resultRecord2.getValue() instanceof Map);
 
-		@SuppressWarnings("unchecked")
-		Map<String, List<Event>> patternMap1 = (Map<String, List<Event>>) resultRecord1.getValue();
+			@SuppressWarnings("unchecked")
+			Map<String, List<Event>> patternMap1 =
+				(Map<String, List<Event>>) resultRecord1.getValue();
 
-		assertEquals(startEvent, patternMap1.get("start").get(0));
-		assertEquals(middleEvent1, patternMap1.get("middle").get(0));
-		assertEquals(endEvent, patternMap1.get("end").get(0));
+			assertEquals(startEvent, patternMap1.get("start").get(0));
+			assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+			assertEquals(endEvent, patternMap1.get("end").get(0));
 
-		@SuppressWarnings("unchecked")
-		Map<String, List<Event>> patternMap2 = (Map<String, List<Event>>) resultRecord2.getValue();
+			@SuppressWarnings("unchecked")
+			Map<String, List<Event>> patternMap2 =
+				(Map<String, List<Event>>) resultRecord2.getValue();
 
-		assertEquals(startEvent, patternMap2.get("start").get(0));
-		assertEquals(middleEvent2, patternMap2.get("middle").get(0));
-		assertEquals(endEvent, patternMap2.get("end").get(0));
+			assertEquals(startEvent, patternMap2.get("start").get(0));
+			assertEquals(middleEvent2, patternMap2.get("middle").get(0));
+			assertEquals(endEvent, patternMap2.get("end").get(0));
 
-		// and now go for a checkpoint with the new serializers
+			// and now go for a checkpoint with the new serializers
 
-		final Event startEvent1 = new Event(42, "start", 2.0);
-		final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0);
-		final Event endEvent1 = new Event(42, "end", 2.0);
+			final Event startEvent1 = new Event(42, "start", 2.0);
+			final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0);
+			final Event endEvent1 = new Event(42, "end", 2.0);
 
-		harness.processElement(new StreamRecord<Event>(startEvent1, 21));
-		harness.processElement(new StreamRecord<Event>(middleEvent3, 23));
+			harness.processElement(new StreamRecord<Event>(startEvent1, 21));
+			harness.processElement(new StreamRecord<Event>(middleEvent3, 23));
 
-		// simulate snapshot/restore with some elements in internal sorting queue
-		OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
-		harness.close();
+			// simulate snapshot/restore with some elements in internal sorting queue
+			OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+			harness.close();
 
-		harness = new KeyedOneInputStreamOperatorTestHarness<>(
+			harness = new KeyedOneInputStreamOperatorTestHarness<>(
 				new KeyedCEPPatternOperator<>(
-						Event.createTypeSerializer(),
-						false,
-						IntSerializer.INSTANCE,
-						new NFAFactory(),
-						true),
+					Event.createTypeSerializer(),
+					false,
+					IntSerializer.INSTANCE,
+					new NFAFactory(),
+					true),
 				keySelector,
 				BasicTypeInfo.INT_TYPE_INFO);
 
-		harness.setup();
-		harness.initializeState(snapshot);
-		harness.open();
+			harness.setup();
+			harness.initializeState(snapshot);
+			harness.open();
 
-		harness.processElement(new StreamRecord<>(endEvent1, 25));
+			harness.processElement(new StreamRecord<>(endEvent1, 25));
 
-		harness.processWatermark(new Watermark(50));
+			harness.processWatermark(new Watermark(50));
 
-		result = harness.getOutput();
+			result = harness.getOutput();
 
-		// watermark and the result
-		assertEquals(2, result.size());
+			// watermark and the result
+			assertEquals(2, result.size());
 
-		Object resultObject3 = result.poll();
-		assertTrue(resultObject3 instanceof StreamRecord);
-		StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
-		assertTrue(resultRecord3.getValue() instanceof Map);
+			Object resultObject3 = result.poll();
+			assertTrue(resultObject3 instanceof StreamRecord);
+			StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
+			assertTrue(resultRecord3.getValue() instanceof Map);
 
-		@SuppressWarnings("unchecked")
-		Map<String, List<Event>> patternMap3 = (Map<String, List<Event>>) resultRecord3.getValue();
+			@SuppressWarnings("unchecked")
+			Map<String, List<Event>> patternMap3 =
+				(Map<String, List<Event>>) resultRecord3.getValue();
 
-		assertEquals(startEvent1, patternMap3.get("start").get(0));
-		assertEquals(middleEvent3, patternMap3.get("middle").get(0));
-		assertEquals(endEvent1, patternMap3.get("end").get(0));
-
-		harness.close();
+			assertEquals(startEvent1, patternMap3.get("start").get(0));
+			assertEquals(middleEvent3, patternMap3.get("middle").get(0));
+			assertEquals(endEvent1, patternMap3.get("end").get(0));
+		} finally {
+			harness.close();
+		}
 	}
 
 	/**
@@ -251,19 +261,23 @@ public class CEPFrom12MigrationTest {
 						keySelector,
 						BasicTypeInfo.INT_TYPE_INFO);
 
-		harness.setup();
-		harness.open();
-		harness.processElement(new StreamRecord<Event>(startEvent1, 1));
-		harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
-		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
-		harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
-		harness.processWatermark(new Watermark(5));
-
-		// do snapshot and save to file
-		OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
-		OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot");
-
-		harness.close();
+		try {
+			harness.setup();
+			harness.open();
+			harness.processElement(new StreamRecord<Event>(startEvent1, 1));
+			harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+			harness
+				.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+			harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+			harness.processWatermark(new Watermark(5));
+
+			// do snapshot and save to file
+			OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+			OperatorSnapshotUtil.writeStateHandle(snapshot,
+				"src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot");
+		} finally {
+			harness.close();
+		}
 	}
 
 	@Test
@@ -295,108 +309,115 @@ public class CEPFrom12MigrationTest {
 						keySelector,
 						BasicTypeInfo.INT_TYPE_INFO);
 
-		harness.setup();
-		harness.initializeState(
+		try {
+			harness.setup();
+			harness.initializeState(
 				OperatorSnapshotUtil.readStateHandle(
-						OperatorSnapshotUtil.getResourceFilename("cep-migration-starting-new-pattern-flink1.2-snapshot")));
-		harness.open();
+					OperatorSnapshotUtil.getResourceFilename(
+						"cep-migration-starting-new-pattern-flink1.2-snapshot")));
+			harness.open();
 
-		harness.processElement(new StreamRecord<>(startEvent2, 5));
-		harness.processElement(new StreamRecord<Event>(middleEvent2, 6));
-		harness.processElement(new StreamRecord<>(endEvent, 7));
+			harness.processElement(new StreamRecord<>(startEvent2, 5));
+			harness.processElement(new StreamRecord<Event>(middleEvent2, 6));
+			harness.processElement(new StreamRecord<>(endEvent, 7));
 
-		harness.processWatermark(new Watermark(20));
+			harness.processWatermark(new Watermark(20));
 
-		ConcurrentLinkedQueue<Object> result = harness.getOutput();
+			ConcurrentLinkedQueue<Object> result = harness.getOutput();
 
-		// watermark and 3 results
-		assertEquals(4, result.size());
+			// watermark and 3 results
+			assertEquals(4, result.size());
 
-		Object resultObject1 = result.poll();
-		assertTrue(resultObject1 instanceof StreamRecord);
-		StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
-		assertTrue(resultRecord1.getValue() instanceof Map);
+			Object resultObject1 = result.poll();
+			assertTrue(resultObject1 instanceof StreamRecord);
+			StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+			assertTrue(resultRecord1.getValue() instanceof Map);
 
-		Object resultObject2 = result.poll();
-		assertTrue(resultObject2 instanceof StreamRecord);
-		StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
-		assertTrue(resultRecord2.getValue() instanceof Map);
+			Object resultObject2 = result.poll();
+			assertTrue(resultObject2 instanceof StreamRecord);
+			StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+			assertTrue(resultRecord2.getValue() instanceof Map);
 
-		Object resultObject3 = result.poll();
-		assertTrue(resultObject3 instanceof StreamRecord);
-		StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
-		assertTrue(resultRecord3.getValue() instanceof Map);
+			Object resultObject3 = result.poll();
+			assertTrue(resultObject3 instanceof StreamRecord);
+			StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
+			assertTrue(resultRecord3.getValue() instanceof Map);
 
-		@SuppressWarnings("unchecked")
-		Map<String, List<Event>> patternMap1 = (Map<String, List<Event>>) resultRecord1.getValue();
+			@SuppressWarnings("unchecked")
+			Map<String, List<Event>> patternMap1 =
+				(Map<String, List<Event>>) resultRecord1.getValue();
 
-		assertEquals(startEvent1, patternMap1.get("start").get(0));
-		assertEquals(middleEvent1, patternMap1.get("middle").get(0));
-		assertEquals(endEvent, patternMap1.get("end").get(0));
+			assertEquals(startEvent1, patternMap1.get("start").get(0));
+			assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+			assertEquals(endEvent, patternMap1.get("end").get(0));
 
-		@SuppressWarnings("unchecked")
-		Map<String, List<Event>> patternMap2 = (Map<String, List<Event>>) resultRecord2.getValue();
+			@SuppressWarnings("unchecked")
+			Map<String, List<Event>> patternMap2 =
+				(Map<String, List<Event>>) resultRecord2.getValue();
 
-		assertEquals(startEvent1, patternMap2.get("start").get(0));
-		assertEquals(middleEvent2, patternMap2.get("middle").get(0));
-		assertEquals(endEvent, patternMap2.get("end").get(0));
+			assertEquals(startEvent1, patternMap2.get("start").get(0));
+			assertEquals(middleEvent2, patternMap2.get("middle").get(0));
+			assertEquals(endEvent, patternMap2.get("end").get(0));
 
-		@SuppressWarnings("unchecked")
-		Map<String, List<Event>> patternMap3 = (Map<String, List<Event>>) resultRecord3.getValue();
+			@SuppressWarnings("unchecked")
+			Map<String, List<Event>> patternMap3 =
+				(Map<String, List<Event>>) resultRecord3.getValue();
 
-		assertEquals(startEvent2, patternMap3.get("start").get(0));
-		assertEquals(middleEvent2, patternMap3.get("middle").get(0));
-		assertEquals(endEvent, patternMap3.get("end").get(0));
+			assertEquals(startEvent2, patternMap3.get("start").get(0));
+			assertEquals(middleEvent2, patternMap3.get("middle").get(0));
+			assertEquals(endEvent, patternMap3.get("end").get(0));
 
-		// and now go for a checkpoint with the new serializers
+			// and now go for a checkpoint with the new serializers
 
-		final Event startEvent3 = new Event(42, "start", 2.0);
-		final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0);
-		final Event endEvent1 = new Event(42, "end", 2.0);
+			final Event startEvent3 = new Event(42, "start", 2.0);
+			final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0);
+			final Event endEvent1 = new Event(42, "end", 2.0);
 
-		harness.processElement(new StreamRecord<Event>(startEvent3, 21));
-		harness.processElement(new StreamRecord<Event>(middleEvent3, 23));
+			harness.processElement(new StreamRecord<Event>(startEvent3, 21));
+			harness.processElement(new StreamRecord<Event>(middleEvent3, 23));
 
-		// simulate snapshot/restore with some elements in internal sorting queue
-		OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
-		harness.close();
+			// simulate snapshot/restore with some elements in internal sorting queue
+			OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+			harness.close();
 
-		harness = new KeyedOneInputStreamOperatorTestHarness<>(
+			harness = new KeyedOneInputStreamOperatorTestHarness<>(
 				new KeyedCEPPatternOperator<>(
-						Event.createTypeSerializer(),
-						false,
-						IntSerializer.INSTANCE,
-						new NFAFactory(),
-						true),
+					Event.createTypeSerializer(),
+					false,
+					IntSerializer.INSTANCE,
+					new NFAFactory(),
+					true),
 				keySelector,
 				BasicTypeInfo.INT_TYPE_INFO);
 
-		harness.setup();
-		harness.initializeState(snapshot);
-		harness.open();
-
-		harness.processElement(new StreamRecord<>(endEvent1, 25));
+			harness.setup();
+			harness.initializeState(snapshot);
+			harness.open();
 
-		harness.processWatermark(new Watermark(50));
+			harness.processElement(new StreamRecord<>(endEvent1, 25));
 
-		result = harness.getOutput();
+			harness.processWatermark(new Watermark(50));
 
-		// watermark and the result
-		assertEquals(2, result.size());
+			result = harness.getOutput();
 
-		Object resultObject4 = result.poll();
-		assertTrue(resultObject4 instanceof StreamRecord);
-		StreamRecord<?> resultRecord4 = (StreamRecord<?>) resultObject4;
-		assertTrue(resultRecord4.getValue() instanceof Map);
+			// watermark and the result
+			assertEquals(2, result.size());
 
-		@SuppressWarnings("unchecked")
-		Map<String, List<Event>> patternMap4 = (Map<String, List<Event>>) resultRecord4.getValue();
+			Object resultObject4 = result.poll();
+			assertTrue(resultObject4 instanceof StreamRecord);
+			StreamRecord<?> resultRecord4 = (StreamRecord<?>) resultObject4;
+			assertTrue(resultRecord4.getValue() instanceof Map);
 
-		assertEquals(startEvent3, patternMap4.get("start").get(0));
-		assertEquals(middleEvent3, patternMap4.get("middle").get(0));
-		assertEquals(endEvent1, patternMap4.get("end").get(0));
+			@SuppressWarnings("unchecked")
+			Map<String, List<Event>> patternMap4 =
+				(Map<String, List<Event>>) resultRecord4.getValue();
 
-		harness.close();
+			assertEquals(startEvent3, patternMap4.get("start").get(0));
+			assertEquals(middleEvent3, patternMap4.get("middle").get(0));
+			assertEquals(endEvent1, patternMap4.get("end").get(0));
+		} finally {
+			harness.close();
+		}
 	}
 
 	/**
@@ -428,15 +449,18 @@ public class CEPFrom12MigrationTest {
 						keySelector,
 						BasicTypeInfo.INT_TYPE_INFO);
 
-		harness.setup();
-		harness.open();
-		harness.processWatermark(new Watermark(5));
-
-		// do snapshot and save to file
-		OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
-		OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot");
-
-		harness.close();
+		try {
+			harness.setup();
+			harness.open();
+			harness.processWatermark(new Watermark(5));
+
+			// do snapshot and save to file
+			OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+			OperatorSnapshotUtil.writeStateHandle(snapshot,
+				"src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot");
+		} finally {
+			harness.close();
+		}
 	}
 
 
@@ -465,32 +489,36 @@ public class CEPFrom12MigrationTest {
 						keySelector,
 						BasicTypeInfo.INT_TYPE_INFO);
 
-		harness.setup();
-		harness.initializeState(
+		try {
+			harness.setup();
+			harness.initializeState(
 				OperatorSnapshotUtil.readStateHandle(
-						OperatorSnapshotUtil.getResourceFilename("cep-migration-single-pattern-afterwards-flink1.2-snapshot")));
-		harness.open();
+					OperatorSnapshotUtil.getResourceFilename(
+						"cep-migration-single-pattern-afterwards-flink1.2-snapshot")));
+			harness.open();
 
-		harness.processElement(new StreamRecord<>(startEvent1, 5));
+			harness.processElement(new StreamRecord<>(startEvent1, 5));
 
-		harness.processWatermark(new Watermark(20));
+			harness.processWatermark(new Watermark(20));
 
-		ConcurrentLinkedQueue<Object> result = harness.getOutput();
+			ConcurrentLinkedQueue<Object> result = harness.getOutput();
 
-		// watermark and the result
-		assertEquals(2, result.size());
+			// watermark and the result
+			assertEquals(2, result.size());
 
-		Object resultObject = result.poll();
-		assertTrue(resultObject instanceof StreamRecord);
-		StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
-		assertTrue(resultRecord.getValue() instanceof Map);
+			Object resultObject = result.poll();
+			assertTrue(resultObject instanceof StreamRecord);
+			StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+			assertTrue(resultRecord.getValue() instanceof Map);
 
-		@SuppressWarnings("unchecked")
-		Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue();
+			@SuppressWarnings("unchecked")
+			Map<String, List<Event>> patternMap =
+				(Map<String, List<Event>>) resultRecord.getValue();
 
-		assertEquals(startEvent1, patternMap.get("start").get(0));
-
-		harness.close();
+			assertEquals(startEvent1, patternMap.get("start").get(0));
+		} finally {
+			harness.close();
+		}
 	}
 
 	private static class SinglePatternNFAFactory implements NFACompiler.NFAFactory<Event> {

http://git-wip-us.apache.org/repos/asf/flink/blob/38c45f80/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index d575e43..c92f772 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -111,81 +111,86 @@ public class CEPMigration11to13Test {
 						keySelector,
 						BasicTypeInfo.INT_TYPE_INFO);
 
-		harness.setup();
-		harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-1_1-snapshot"));
-		harness.open();
+		try {
+			harness.setup();
+			harness
+				.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-1_1-snapshot"));
+			harness.open();
 
-		harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
-		harness.processElement(new StreamRecord<>(endEvent, 5));
+			harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+			harness.processElement(new StreamRecord<>(endEvent, 5));
 
-		harness.processWatermark(new Watermark(20));
+			harness.processWatermark(new Watermark(20));
 
-		ConcurrentLinkedQueue<Object> result = harness.getOutput();
+			ConcurrentLinkedQueue<Object> result = harness.getOutput();
 
-		// watermark and the result
-		assertEquals(2, result.size());
+			// watermark and the result
+			assertEquals(2, result.size());
 
-		Object resultObject = result.poll();
-		assertTrue(resultObject instanceof StreamRecord);
-		StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
-		assertTrue(resultRecord.getValue() instanceof Map);
+			Object resultObject = result.poll();
+			assertTrue(resultObject instanceof StreamRecord);
+			StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+			assertTrue(resultRecord.getValue() instanceof Map);
 
-		@SuppressWarnings("unchecked")
-		Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue();
+			@SuppressWarnings("unchecked")
+			Map<String, List<Event>> patternMap =
+				(Map<String, List<Event>>) resultRecord.getValue();
 
-		assertEquals(startEvent, patternMap.get("start").get(0));
-		assertEquals(middleEvent, patternMap.get("middle").get(0));
-		assertEquals(endEvent, patternMap.get("end").get(0));
+			assertEquals(startEvent, patternMap.get("start").get(0));
+			assertEquals(middleEvent, patternMap.get("middle").get(0));
+			assertEquals(endEvent, patternMap.get("end").get(0));
 
-		// and now go for a checkpoint with the new serializers
+			// and now go for a checkpoint with the new serializers
 
-		final Event startEvent1 = new Event(42, "start", 2.0);
-		final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0);
-		final Event endEvent1 = new Event(42, "end", 2.0);
+			final Event startEvent1 = new Event(42, "start", 2.0);
+			final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0);
+			final Event endEvent1 = new Event(42, "end", 2.0);
 
-		harness.processElement(new StreamRecord<Event>(startEvent1, 21));
-		harness.processElement(new StreamRecord<Event>(middleEvent1, 23));
+			harness.processElement(new StreamRecord<Event>(startEvent1, 21));
+			harness.processElement(new StreamRecord<Event>(middleEvent1, 23));
 
-		// simulate snapshot/restore with some elements in internal sorting queue
-		OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
-		harness.close();
+			// simulate snapshot/restore with some elements in internal sorting queue
+			OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+			harness.close();
 
-		harness = new KeyedOneInputStreamOperatorTestHarness<>(
-						new KeyedCEPPatternOperator<>(
-								Event.createTypeSerializer(),
-								false,
-								IntSerializer.INSTANCE,
-								new NFAFactory(),
-								true),
-						keySelector,
-						BasicTypeInfo.INT_TYPE_INFO);
-
-		harness.setup();
-		harness.initializeState(snapshot);
-		harness.open();
+			harness = new KeyedOneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+					Event.createTypeSerializer(),
+					false,
+					IntSerializer.INSTANCE,
+					new NFAFactory(),
+					true),
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
 
-		harness.processElement(new StreamRecord<>(endEvent1, 25));
+			harness.setup();
+			harness.initializeState(snapshot);
+			harness.open();
 
-		harness.processWatermark(new Watermark(50));
+			harness.processElement(new StreamRecord<>(endEvent1, 25));
 
-		result = harness.getOutput();
+			harness.processWatermark(new Watermark(50));
 
-		// watermark and the result
-		assertEquals(2, result.size());
+			result = harness.getOutput();
 
-		Object resultObject1 = result.poll();
-		assertTrue(resultObject1 instanceof StreamRecord);
-		StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
-		assertTrue(resultRecord1.getValue() instanceof Map);
+			// watermark and the result
+			assertEquals(2, result.size());
 
-		@SuppressWarnings("unchecked")
-		Map<String, List<Event>> patternMap1 = (Map<String, List<Event>>) resultRecord1.getValue();
+			Object resultObject1 = result.poll();
+			assertTrue(resultObject1 instanceof StreamRecord);
+			StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+			assertTrue(resultRecord1.getValue() instanceof Map);
 
-		assertEquals(startEvent1, patternMap1.get("start").get(0));
-		assertEquals(middleEvent1, patternMap1.get("middle").get(0));
-		assertEquals(endEvent1, patternMap1.get("end").get(0));
+			@SuppressWarnings("unchecked")
+			Map<String, List<Event>> patternMap1 =
+				(Map<String, List<Event>>) resultRecord1.getValue();
 
-		harness.close();
+			assertEquals(startEvent1, patternMap1.get("start").get(0));
+			assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+			assertEquals(endEvent1, patternMap1.get("end").get(0));
+		} finally {
+			harness.close();
+		}
 	}
 
 	@Test
@@ -233,81 +238,86 @@ public class CEPMigration11to13Test {
 						keySelector,
 						BasicTypeInfo.BYTE_TYPE_INFO);
 
-		harness.setup();
-		harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-1.1-snapshot"));
-		harness.open();
+		try {
+			harness.setup();
+			harness.initializeStateFromLegacyCheckpoint(
+				getResourceFilename("cep-non-keyed-1.1-snapshot"));
+			harness.open();
 
-		harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
-		harness.processElement(new StreamRecord<>(endEvent, 5));
+			harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+			harness.processElement(new StreamRecord<>(endEvent, 5));
 
-		harness.processWatermark(new Watermark(20));
+			harness.processWatermark(new Watermark(20));
 
-		ConcurrentLinkedQueue<Object> result = harness.getOutput();
+			ConcurrentLinkedQueue<Object> result = harness.getOutput();
 
-		// watermark and the result
-		assertEquals(2, result.size());
+			// watermark and the result
+			assertEquals(2, result.size());
 
-		Object resultObject = result.poll();
-		assertTrue(resultObject instanceof StreamRecord);
-		StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
-		assertTrue(resultRecord.getValue() instanceof Map);
+			Object resultObject = result.poll();
+			assertTrue(resultObject instanceof StreamRecord);
+			StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+			assertTrue(resultRecord.getValue() instanceof Map);
 
-		@SuppressWarnings("unchecked")
-		Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue();
+			@SuppressWarnings("unchecked")
+			Map<String, List<Event>> patternMap =
+				(Map<String, List<Event>>) resultRecord.getValue();
 
-		assertEquals(startEvent, patternMap.get("start").get(0));
-		assertEquals(middleEvent, patternMap.get("middle").get(0));
-		assertEquals(endEvent, patternMap.get("end").get(0));
+			assertEquals(startEvent, patternMap.get("start").get(0));
+			assertEquals(middleEvent, patternMap.get("middle").get(0));
+			assertEquals(endEvent, patternMap.get("end").get(0));
 
-		// and now go for a checkpoint with the new serializers
+			// and now go for a checkpoint with the new serializers
 
-		final Event startEvent1 = new Event(42, "start", 2.0);
-		final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0);
-		final Event endEvent1 = new Event(42, "end", 2.0);
+			final Event startEvent1 = new Event(42, "start", 2.0);
+			final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0);
+			final Event endEvent1 = new Event(42, "end", 2.0);
 
-		harness.processElement(new StreamRecord<Event>(startEvent1, 21));
-		harness.processElement(new StreamRecord<Event>(middleEvent1, 23));
+			harness.processElement(new StreamRecord<Event>(startEvent1, 21));
+			harness.processElement(new StreamRecord<Event>(middleEvent1, 23));
 
-		// simulate snapshot/restore with some elements in internal sorting queue
-		OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
-		harness.close();
+			// simulate snapshot/restore with some elements in internal sorting queue
+			OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+			harness.close();
 
-		harness = new KeyedOneInputStreamOperatorTestHarness<>(
-						new KeyedCEPPatternOperator<>(
-								Event.createTypeSerializer(),
-								false,
-								ByteSerializer.INSTANCE,
-								new NFAFactory(),
-								false),
-						keySelector,
-						BasicTypeInfo.BYTE_TYPE_INFO);
-
-		harness.setup();
-		harness.initializeState(snapshot);
-		harness.open();
+			harness = new KeyedOneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+					Event.createTypeSerializer(),
+					false,
+					ByteSerializer.INSTANCE,
+					new NFAFactory(),
+					false),
+				keySelector,
+				BasicTypeInfo.BYTE_TYPE_INFO);
 
-		harness.processElement(new StreamRecord<>(endEvent1, 25));
+			harness.setup();
+			harness.initializeState(snapshot);
+			harness.open();
 
-		harness.processWatermark(new Watermark(50));
+			harness.processElement(new StreamRecord<>(endEvent1, 25));
 
-		result = harness.getOutput();
+			harness.processWatermark(new Watermark(50));
 
-		// watermark and the result
-		assertEquals(2, result.size());
+			result = harness.getOutput();
 
-		Object resultObject1 = result.poll();
-		assertTrue(resultObject1 instanceof StreamRecord);
-		StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
-		assertTrue(resultRecord1.getValue() instanceof Map);
+			// watermark and the result
+			assertEquals(2, result.size());
 
-		@SuppressWarnings("unchecked")
-		Map<String, List<Event>> patternMap1 = (Map<String, List<Event>>) resultRecord1.getValue();
+			Object resultObject1 = result.poll();
+			assertTrue(resultObject1 instanceof StreamRecord);
+			StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+			assertTrue(resultRecord1.getValue() instanceof Map);
 
-		assertEquals(startEvent1, patternMap1.get("start").get(0));
-		assertEquals(middleEvent1, patternMap1.get("middle").get(0));
-		assertEquals(endEvent1, patternMap1.get("end").get(0));
+			@SuppressWarnings("unchecked")
+			Map<String, List<Event>> patternMap1 =
+				(Map<String, List<Event>>) resultRecord1.getValue();
 
-		harness.close();
+			assertEquals(startEvent1, patternMap1.get("start").get(0));
+			assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+			assertEquals(endEvent1, patternMap1.get("end").get(0));
+		} finally {
+			harness.close();
+		}
 	}
 
 	private static class NFAFactory implements NFACompiler.NFAFactory<Event> {

http://git-wip-us.apache.org/repos/asf/flink/blob/38c45f80/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index bf1436d..38ad0f1 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -67,15 +67,17 @@ public class CEPOperatorTest extends TestLogger {
 
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(false);
 
-		harness.open();
-
-		Watermark expectedWatermark = new Watermark(42L);
+		try {
+			harness.open();
 
-		harness.processWatermark(expectedWatermark);
+			Watermark expectedWatermark = new Watermark(42L);
 
-		verifyWatermark(harness.getOutput().poll(), 42L);
+			harness.processWatermark(expectedWatermark);
 
-		harness.close();
+			verifyWatermark(harness.getOutput().poll(), 42L);
+		} finally {
+			harness.close();
+		}
 	}
 
 	@Test
@@ -83,59 +85,62 @@ public class CEPOperatorTest extends TestLogger {
 
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(false);
 
-		harness.open();
-
-		Event startEvent = new Event(42, "start", 1.0);
-		SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
-		Event endEvent=  new Event(42, "end", 1.0);
+		try {
+			harness.open();
 
-		harness.processElement(new StreamRecord<>(startEvent, 1L));
-		harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
+			Event startEvent = new Event(42, "start", 1.0);
+			SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+			Event endEvent = new Event(42, "end", 1.0);
 
-		// simulate snapshot/restore with some elements in internal sorting queue
-		OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
-		harness.close();
+			harness.processElement(new StreamRecord<>(startEvent, 1L));
+			harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
 
-		harness = getCepTestHarness(false);
+			// simulate snapshot/restore with some elements in internal sorting queue
+			OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+			harness.close();
 
-		harness.setup();
-		harness.initializeState(snapshot);
-		harness.open();
+			harness = getCepTestHarness(false);
 
-		harness.processWatermark(new Watermark(Long.MIN_VALUE));
+			harness.setup();
+			harness.initializeState(snapshot);
+			harness.open();
 
-		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
+			harness.processWatermark(new Watermark(Long.MIN_VALUE));
 
-		// if element timestamps are not correctly checkpointed/restored this will lead to
-		// a pruning time underflow exception in NFA
-		harness.processWatermark(new Watermark(2L));
+			harness
+				.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
 
-		harness.processElement(new StreamRecord<Event>(middleEvent, 3L));
-		harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4L));
-		harness.processElement(new StreamRecord<>(endEvent, 5L));
+			// if element timestamps are not correctly checkpointed/restored this will lead to
+			// a pruning time underflow exception in NFA
+			harness.processWatermark(new Watermark(2L));
 
-		// simulate snapshot/restore with empty element queue but NFA state
-		OperatorStateHandles snapshot2 = harness.snapshot(1L, 1L);
-		harness.close();
+			harness.processElement(new StreamRecord<Event>(middleEvent, 3L));
+			harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4L));
+			harness.processElement(new StreamRecord<>(endEvent, 5L));
 
-		harness = getCepTestHarness(false);
+			// simulate snapshot/restore with empty element queue but NFA state
+			OperatorStateHandles snapshot2 = harness.snapshot(1L, 1L);
+			harness.close();
 
-		harness.setup();
-		harness.initializeState(snapshot2);
-		harness.open();
+			harness = getCepTestHarness(false);
 
-		harness.processWatermark(new Watermark(Long.MAX_VALUE));
+			harness.setup();
+			harness.initializeState(snapshot2);
+			harness.open();
 
-		// get and verify the output
+			harness.processWatermark(new Watermark(Long.MAX_VALUE));
 
-		Queue<Object> result = harness.getOutput();
+			// get and verify the output
 
-		assertEquals(2, result.size());
+			Queue<Object> result = harness.getOutput();
 
-		verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
-		verifyWatermark(result.poll(), Long.MAX_VALUE);
+			assertEquals(2, result.size());
 
-		harness.close();
+			verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
+			verifyWatermark(result.poll(), Long.MAX_VALUE);
+		} finally {
+			harness.close();
+		}
 	}
 
 	@Test
@@ -147,68 +152,71 @@ public class CEPOperatorTest extends TestLogger {
 
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(false);
 
-		harness.setStateBackend(rocksDBStateBackend);
-
-		harness.open();
+		try {
+			harness.setStateBackend(rocksDBStateBackend);
 
-		Event startEvent = new Event(42, "start", 1.0);
-		SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
-		Event endEvent=  new Event(42, "end", 1.0);
+			harness.open();
 
-		harness.processElement(new StreamRecord<>(startEvent, 1L));
-		harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
+			Event startEvent = new Event(42, "start", 1.0);
+			SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+			Event endEvent = new Event(42, "end", 1.0);
 
-		// simulate snapshot/restore with some elements in internal sorting queue
-		OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
-		harness.close();
+			harness.processElement(new StreamRecord<>(startEvent, 1L));
+			harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
 
-		harness = getCepTestHarness(false);
+			// simulate snapshot/restore with some elements in internal sorting queue
+			OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+			harness.close();
 
-		rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
-		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
-		harness.setStateBackend(rocksDBStateBackend);
+			harness = getCepTestHarness(false);
 
-		harness.setup();
-		harness.initializeState(snapshot);
-		harness.open();
+			rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
+			rocksDBStateBackend.setDbStoragePath(rocksDbPath);
+			harness.setStateBackend(rocksDBStateBackend);
 
-		harness.processWatermark(new Watermark(Long.MIN_VALUE));
+			harness.setup();
+			harness.initializeState(snapshot);
+			harness.open();
 
-		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
+			harness.processWatermark(new Watermark(Long.MIN_VALUE));
 
-		// if element timestamps are not correctly checkpointed/restored this will lead to
-		// a pruning time underflow exception in NFA
-		harness.processWatermark(new Watermark(2L));
+			harness
+				.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
 
-		// simulate snapshot/restore with empty element queue but NFA state
-		OperatorStateHandles snapshot2 = harness.snapshot(1L, 1L);
-		harness.close();
+			// if element timestamps are not correctly checkpointed/restored this will lead to
+			// a pruning time underflow exception in NFA
+			harness.processWatermark(new Watermark(2L));
 
-		harness = getCepTestHarness(false);
+			// simulate snapshot/restore with empty element queue but NFA state
+			OperatorStateHandles snapshot2 = harness.snapshot(1L, 1L);
+			harness.close();
 
-		rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
-		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
-		harness.setStateBackend(rocksDBStateBackend);
-		harness.setup();
-		harness.initializeState(snapshot2);
-		harness.open();
+			harness = getCepTestHarness(false);
 
-		harness.processElement(new StreamRecord<Event>(middleEvent, 3L));
-		harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4L));
-		harness.processElement(new StreamRecord<>(endEvent, 5L));
+			rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
+			rocksDBStateBackend.setDbStoragePath(rocksDbPath);
+			harness.setStateBackend(rocksDBStateBackend);
+			harness.setup();
+			harness.initializeState(snapshot2);
+			harness.open();
 
-		harness.processWatermark(new Watermark(Long.MAX_VALUE));
+			harness.processElement(new StreamRecord<Event>(middleEvent, 3L));
+			harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4L));
+			harness.processElement(new StreamRecord<>(endEvent, 5L));
 
-		// get and verify the output
+			harness.processWatermark(new Watermark(Long.MAX_VALUE));
 
-		Queue<Object> result = harness.getOutput();
+			// get and verify the output
 
-		assertEquals(2, result.size());
+			Queue<Object> result = harness.getOutput();
 
-		verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
-		verifyWatermark(result.poll(), Long.MAX_VALUE);
+			assertEquals(2, result.size());
 
-		harness.close();
+			verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
+			verifyWatermark(result.poll(), Long.MAX_VALUE);
+		} finally {
+			harness.close();
+		}
 	}
 
 	/**
@@ -299,85 +307,88 @@ public class CEPOperatorTest extends TestLogger {
 		KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(false);
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator);
 
-		harness.open();
-
-		harness.processWatermark(new Watermark(Long.MIN_VALUE));
-
-		harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
-		harness.processElement(new StreamRecord<Event>(middleEvent1, 2L));
-		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
-		harness.processElement(new StreamRecord<>(startEvent1, 1L));
-		harness.processElement(new StreamRecord<>(startEventK2, 1L));
-
-		// there must be 2 keys 42, 43 registered for the watermark callback
-		// all the seen elements must be in the priority queues but no NFA yet.
-
-		assertEquals(2L, harness.numEventTimeTimers());
-		assertEquals(4L, operator.getPQSize(42));
-		assertEquals(1L, operator.getPQSize(43));
-		assertTrue(!operator.hasNonEmptyNFA(42));
-		assertTrue(!operator.hasNonEmptyNFA(43));
-
-		harness.processWatermark(new Watermark(2L));
-
-		verifyWatermark(harness.getOutput().poll(), Long.MIN_VALUE);
-		verifyWatermark(harness.getOutput().poll(), 2L);
-
-		// still the 2 keys
-		// one element in PQ for 42 (the barfoo) as it arrived early
-		// for 43 the element entered the NFA and the PQ is empty
-
-		assertEquals(2L, harness.numEventTimeTimers());
-		assertTrue(operator.hasNonEmptyNFA(42));
-		assertEquals(1L, operator.getPQSize(42));
-		assertTrue(operator.hasNonEmptyNFA(43));
-		assertTrue(!operator.hasNonEmptyPQ(43));
-
-		harness.processElement(new StreamRecord<>(startEvent2, 4L));
-		harness.processElement(new StreamRecord<Event>(middleEvent2, 5L));
-
-		OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
-		harness.close();
-
-		KeyedCEPPatternOperator<Event, Integer> operator2 = getKeyedCepOpearator(false);
-		harness = getCepTestHarness(operator2);
-		harness.setup();
-		harness.initializeState(snapshot);
-		harness.open();
-
-		harness.processElement(new StreamRecord<>(endEvent1, 6L));
-		harness.processWatermark(11L);
-		harness.processWatermark(12L);
-
-		// now we have 1 key because the 43 expired and was removed.
-		// 42 is still there due to startEvent2
-		assertEquals(1L, harness.numEventTimeTimers());
-		assertTrue(operator2.hasNonEmptyNFA(42));
-		assertTrue(!operator2.hasNonEmptyPQ(42));
-		assertTrue(!operator2.hasNonEmptyNFA(43));
-		assertTrue(!operator2.hasNonEmptyPQ(43));
-
-		verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
-		verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent2, endEvent1);
-		verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent1);
-		verifyWatermark(harness.getOutput().poll(), 11L);
-		verifyWatermark(harness.getOutput().poll(), 12L);
-
-		harness.processElement(new StreamRecord<Event>(middleEvent3, 12L));
-		harness.processElement(new StreamRecord<>(endEvent2, 13L));
-		harness.processWatermark(20L);
-		harness.processWatermark(21L);
-
-		assertTrue(!operator2.hasNonEmptyNFA(42));
-		assertTrue(!operator2.hasNonEmptyPQ(42));
-		assertEquals(0L, harness.numEventTimeTimers());
-
-		verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
-		verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2);
-		verifyWatermark(harness.getOutput().poll(), 20L);
-		verifyWatermark(harness.getOutput().poll(), 21L);
-
-		harness.close();
+		try {
+			harness.open();
+
+			harness.processWatermark(new Watermark(Long.MIN_VALUE));
+
+			harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
+			harness.processElement(new StreamRecord<Event>(middleEvent1, 2L));
+			harness
+				.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
+			harness.processElement(new StreamRecord<>(startEvent1, 1L));
+			harness.processElement(new StreamRecord<>(startEventK2, 1L));
+
+			// there must be 2 keys 42, 43 registered for the watermark callback
+			// all the seen elements must be in the priority queues but no NFA yet.
+
+			assertEquals(2L, harness.numEventTimeTimers());
+			assertEquals(4L, operator.getPQSize(42));
+			assertEquals(1L, operator.getPQSize(43));
+			assertTrue(!operator.hasNonEmptyNFA(42));
+			assertTrue(!operator.hasNonEmptyNFA(43));
+
+			harness.processWatermark(new Watermark(2L));
+
+			verifyWatermark(harness.getOutput().poll(), Long.MIN_VALUE);
+			verifyWatermark(harness.getOutput().poll(), 2L);
+
+			// still the 2 keys
+			// one element in PQ for 42 (the barfoo) as it arrived early
+			// for 43 the element entered the NFA and the PQ is empty
+
+			assertEquals(2L, harness.numEventTimeTimers());
+			assertTrue(operator.hasNonEmptyNFA(42));
+			assertEquals(1L, operator.getPQSize(42));
+			assertTrue(operator.hasNonEmptyNFA(43));
+			assertTrue(!operator.hasNonEmptyPQ(43));
+
+			harness.processElement(new StreamRecord<>(startEvent2, 4L));
+			harness.processElement(new StreamRecord<Event>(middleEvent2, 5L));
+
+			OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+			harness.close();
+
+			KeyedCEPPatternOperator<Event, Integer> operator2 = getKeyedCepOpearator(false);
+			harness = getCepTestHarness(operator2);
+			harness.setup();
+			harness.initializeState(snapshot);
+			harness.open();
+
+			harness.processElement(new StreamRecord<>(endEvent1, 6L));
+			harness.processWatermark(11L);
+			harness.processWatermark(12L);
+
+			// now we have 1 key because the 43 expired and was removed.
+			// 42 is still there due to startEvent2
+			assertEquals(1L, harness.numEventTimeTimers());
+			assertTrue(operator2.hasNonEmptyNFA(42));
+			assertTrue(!operator2.hasNonEmptyPQ(42));
+			assertTrue(!operator2.hasNonEmptyNFA(43));
+			assertTrue(!operator2.hasNonEmptyPQ(43));
+
+			verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
+			verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent2, endEvent1);
+			verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent1);
+			verifyWatermark(harness.getOutput().poll(), 11L);
+			verifyWatermark(harness.getOutput().poll(), 12L);
+
+			harness.processElement(new StreamRecord<Event>(middleEvent3, 12L));
+			harness.processElement(new StreamRecord<>(endEvent2, 13L));
+			harness.processWatermark(20L);
+			harness.processWatermark(21L);
+
+			assertTrue(!operator2.hasNonEmptyNFA(42));
+			assertTrue(!operator2.hasNonEmptyPQ(42));
+			assertEquals(0L, harness.numEventTimeTimers());
+
+			verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
+			verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2);
+			verifyWatermark(harness.getOutput().poll(), 20L);
+			verifyWatermark(harness.getOutput().poll(), 21L);
+		} finally {
+			harness.close();
+		}
 	}
 
 	@Test
@@ -397,49 +408,51 @@ public class CEPOperatorTest extends TestLogger {
 				true);
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator);
 
-		harness.open();
+		try {
+			harness.open();
 
-		harness.processWatermark(new Watermark(Long.MIN_VALUE));
+			harness.processWatermark(new Watermark(Long.MIN_VALUE));
 
-		harness.processElement(new StreamRecord<>(middle2Event1, 6));
-		harness.processElement(new StreamRecord<>(middle1Event3, 7));
-		harness.processElement(new StreamRecord<>(startEvent, 1));
-		harness.processElement(new StreamRecord<>(middle1Event1, 3));
-		harness.processElement(new StreamRecord<>(middle1Event2, 3));
-		harness.processElement(new StreamRecord<>(middle1Event1, 3));
-		harness.processElement(new StreamRecord<>(new Event(41, "d", 6.0), 5));
+			harness.processElement(new StreamRecord<>(middle2Event1, 6));
+			harness.processElement(new StreamRecord<>(middle1Event3, 7));
+			harness.processElement(new StreamRecord<>(startEvent, 1));
+			harness.processElement(new StreamRecord<>(middle1Event1, 3));
+			harness.processElement(new StreamRecord<>(middle1Event2, 3));
+			harness.processElement(new StreamRecord<>(middle1Event1, 3));
+			harness.processElement(new StreamRecord<>(new Event(41, "d", 6.0), 5));
 
-		assertEquals(1L, harness.numEventTimeTimers());
-		assertEquals(7L, operator.getPQSize(41));
-		assertTrue(!operator.hasNonEmptyNFA(41));
+			assertEquals(1L, harness.numEventTimeTimers());
+			assertEquals(7L, operator.getPQSize(41));
+			assertTrue(!operator.hasNonEmptyNFA(41));
 
-		harness.processWatermark(new Watermark(2L));
+			harness.processWatermark(new Watermark(2L));
 
-		verifyWatermark(harness.getOutput().poll(), Long.MIN_VALUE);
-		verifyWatermark(harness.getOutput().poll(), 2L);
+			verifyWatermark(harness.getOutput().poll(), Long.MIN_VALUE);
+			verifyWatermark(harness.getOutput().poll(), 2L);
 
-		assertEquals(1L, harness.numEventTimeTimers());
-		assertEquals(6L, operator.getPQSize(41));
-		assertTrue(operator.hasNonEmptyNFA(41)); // processed the first element
+			assertEquals(1L, harness.numEventTimeTimers());
+			assertEquals(6L, operator.getPQSize(41));
+			assertTrue(operator.hasNonEmptyNFA(41)); // processed the first element
 
-		harness.processWatermark(new Watermark(8L));
+			harness.processWatermark(new Watermark(8L));
 
-		List<List<Event>> resultingPatterns = new ArrayList<>();
-		while (!harness.getOutput().isEmpty()) {
-			Object o = harness.getOutput().poll();
-			if (!(o instanceof Watermark)) {
-				StreamRecord<Map<String, List<Event>>> el = (StreamRecord<Map<String, List<Event>>>) o;
-				List<Event> res = new ArrayList<>();
-				for (List<Event> le: el.getValue().values()) {
-					res.addAll(le);
+			List<List<Event>> resultingPatterns = new ArrayList<>();
+			while (!harness.getOutput().isEmpty()) {
+				Object o = harness.getOutput().poll();
+				if (!(o instanceof Watermark)) {
+					StreamRecord<Map<String, List<Event>>> el =
+						(StreamRecord<Map<String, List<Event>>>) o;
+					List<Event> res = new ArrayList<>();
+					for (List<Event> le : el.getValue().values()) {
+						res.addAll(le);
+					}
+					resultingPatterns.add(res);
+				} else {
+					verifyWatermark(o, 8L);
 				}
-				resultingPatterns.add(res);
-			} else {
-				verifyWatermark(o, 8L);
 			}
-		}
 
-		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 				Lists.newArrayList(startEvent, middle1Event1),
 
 				Lists.newArrayList(startEvent, middle1Event1, middle1Event2),
@@ -448,24 +461,28 @@ public class CEPOperatorTest extends TestLogger {
 				Lists.newArrayList(startEvent, middle1Event1, middle1Event2, middle1Event1),
 				Lists.newArrayList(startEvent, middle1Event1, middle2Event1, middle1Event3),
 
-				Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle1Event3),
-				Lists.newArrayList(startEvent, middle1Event1, middle1Event2, middle2Event1, middle1Event3),
+				Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2,
+					middle1Event3),
+				Lists.newArrayList(startEvent, middle1Event1, middle1Event2, middle2Event1,
+					middle1Event3),
 
-				Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle2Event1, middle1Event3)
-		));
+				Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2,
+					middle2Event1, middle1Event3)
+			));
 
-		assertEquals(1L, harness.numEventTimeTimers());
-		assertEquals(0L, operator.getPQSize(41));
-		assertTrue(operator.hasNonEmptyNFA(41));
+			assertEquals(1L, harness.numEventTimeTimers());
+			assertEquals(0L, operator.getPQSize(41));
+			assertTrue(operator.hasNonEmptyNFA(41));
 
-		harness.processWatermark(new Watermark(17L));
-		verifyWatermark(harness.getOutput().poll(), 17L);
+			harness.processWatermark(new Watermark(17L));
+			verifyWatermark(harness.getOutput().poll(), 17L);
 
-		assertTrue(!operator.hasNonEmptyNFA(41));
-		assertTrue(!operator.hasNonEmptyPQ(41));
-		assertEquals(0L, harness.numEventTimeTimers());
-
-		harness.close();
+			assertTrue(!operator.hasNonEmptyNFA(41));
+			assertTrue(!operator.hasNonEmptyPQ(41));
+			assertEquals(0L, harness.numEventTimeTimers());
+		} finally {
+			harness.close();
+		}
 	}
 
 	@Test
@@ -484,70 +501,73 @@ public class CEPOperatorTest extends TestLogger {
 		KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(true);
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator);
 
-		harness.open();
-
-		harness.setProcessingTime(0L);
+		try {
+			harness.open();
 
-		harness.processElement(new StreamRecord<>(startEvent1, 1L));
-		harness.processElement(new StreamRecord<>(startEventK2, 1L));
-		harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
-		harness.processElement(new StreamRecord<Event>(middleEvent1, 2L));
-		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
+			harness.setProcessingTime(0L);
 
-		assertTrue(!operator.hasNonEmptyPQ(42));
-		assertTrue(!operator.hasNonEmptyPQ(43));
-		assertTrue(operator.hasNonEmptyNFA(42));
-		assertTrue(operator.hasNonEmptyNFA(43));
+			harness.processElement(new StreamRecord<>(startEvent1, 1L));
+			harness.processElement(new StreamRecord<>(startEventK2, 1L));
+			harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
+			harness.processElement(new StreamRecord<Event>(middleEvent1, 2L));
+			harness
+				.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
 
-		harness.setProcessingTime(3L);
+			assertTrue(!operator.hasNonEmptyPQ(42));
+			assertTrue(!operator.hasNonEmptyPQ(43));
+			assertTrue(operator.hasNonEmptyNFA(42));
+			assertTrue(operator.hasNonEmptyNFA(43));
 
-		harness.processElement(new StreamRecord<>(startEvent2, 3L));
-		harness.processElement(new StreamRecord<Event>(middleEvent2, 4L));
+			harness.setProcessingTime(3L);
 
-		OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
-		harness.close();
+			harness.processElement(new StreamRecord<>(startEvent2, 3L));
+			harness.processElement(new StreamRecord<Event>(middleEvent2, 4L));
 
-		KeyedCEPPatternOperator<Event, Integer> operator2 = getKeyedCepOpearator(true);
-		harness = getCepTestHarness(operator2);
-		harness.setup();
-		harness.initializeState(snapshot);
-		harness.open();
+			OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+			harness.close();
 
-		harness.setProcessingTime(3L);
-		harness.processElement(new StreamRecord<>(endEvent1, 5L));
+			KeyedCEPPatternOperator<Event, Integer> operator2 = getKeyedCepOpearator(true);
+			harness = getCepTestHarness(operator2);
+			harness.setup();
+			harness.initializeState(snapshot);
+			harness.open();
 
-		verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
-		verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent2, endEvent1);
-		verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent1);
+			harness.setProcessingTime(3L);
+			harness.processElement(new StreamRecord<>(endEvent1, 5L));
 
-		harness.setProcessingTime(11L);
+			verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
+			verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent2, endEvent1);
+			verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent1);
 
-		harness.processElement(new StreamRecord<Event>(middleEvent3, 11L));
-		harness.processElement(new StreamRecord<>(endEvent2, 12L));
+			harness.setProcessingTime(11L);
 
-		verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
-		verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2);
+			harness.processElement(new StreamRecord<Event>(middleEvent3, 11L));
+			harness.processElement(new StreamRecord<>(endEvent2, 12L));
 
-		harness.setProcessingTime(21L);
+			verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
+			verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2);
 
-		assertTrue(operator2.hasNonEmptyNFA(42));
+			harness.setProcessingTime(21L);
 
-		harness.processElement(new StreamRecord<>(startEvent1, 21L));
-		assertTrue(operator2.hasNonEmptyNFA(42));
+			assertTrue(operator2.hasNonEmptyNFA(42));
 
-		harness.setProcessingTime(49L);
+			harness.processElement(new StreamRecord<>(startEvent1, 21L));
+			assertTrue(operator2.hasNonEmptyNFA(42));
 
-		// TODO: 3/13/17 we have to have another event in order to clean up
-		harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
+			harness.setProcessingTime(49L);
 
-		// the pattern expired
-		assertTrue(!operator2.hasNonEmptyNFA(42));
+			// TODO: 3/13/17 we have to have another event in order to clean up
+			harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
 
-		assertEquals(0L, harness.numEventTimeTimers());
-		assertTrue(!operator2.hasNonEmptyPQ(42));
-		assertTrue(!operator2.hasNonEmptyPQ(43));
+			// the pattern expired
+			assertTrue(!operator2.hasNonEmptyNFA(42));
 
-		harness.close();
+			assertEquals(0L, harness.numEventTimeTimers());
+			assertTrue(!operator2.hasNonEmptyPQ(42));
+			assertTrue(!operator2.hasNonEmptyPQ(43));
+		} finally {
+			harness.close();
+		}
 	}
 
 	@Test
@@ -614,47 +634,53 @@ public class CEPOperatorTest extends TestLogger {
 				true);
 
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator);
-		harness.setStateBackend(rocksDBStateBackend);
-		harness.open();
-
-		harness.processWatermark(0L);
-		harness.processElement(new StreamRecord<>(startEvent1, 1));
-		harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
-		harness.processWatermark(2L);
-		harness.processElement(new StreamRecord<Event>(middleEvent3, 5));
-		harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
-		harness.processElement(new StreamRecord<>(startEvent2, 4));
-		harness.processWatermark(5L);
-		harness.processElement(new StreamRecord<>(nextOne, 6));
-		harness.processElement(new StreamRecord<>(endEvent, 8));
-		harness.processElement(new StreamRecord<Event>(middleEvent4, 5));
-		harness.processWatermark(100L);
-
-		List<List<Event>> resultingPatterns = new ArrayList<>();
-		while (!harness.getOutput().isEmpty()) {
-			Object o = harness.getOutput().poll();
-			if (!(o instanceof Watermark)) {
-				StreamRecord<Map<String, List<Event>>> el = (StreamRecord<Map<String, List<Event>>>) o;
-				List<Event> res = new ArrayList<>();
-				for (List<Event> le: el.getValue().values()) {
-					res.addAll(le);
+
+		try {
+			harness.setStateBackend(rocksDBStateBackend);
+			harness.open();
+
+			harness.processWatermark(0L);
+			harness.processElement(new StreamRecord<>(startEvent1, 1));
+			harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+			harness.processWatermark(2L);
+			harness.processElement(new StreamRecord<Event>(middleEvent3, 5));
+			harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
+			harness.processElement(new StreamRecord<>(startEvent2, 4));
+			harness.processWatermark(5L);
+			harness.processElement(new StreamRecord<>(nextOne, 6));
+			harness.processElement(new StreamRecord<>(endEvent, 8));
+			harness.processElement(new StreamRecord<Event>(middleEvent4, 5));
+			harness.processWatermark(100L);
+
+			List<List<Event>> resultingPatterns = new ArrayList<>();
+			while (!harness.getOutput().isEmpty()) {
+				Object o = harness.getOutput().poll();
+				if (!(o instanceof Watermark)) {
+					StreamRecord<Map<String, List<Event>>> el =
+						(StreamRecord<Map<String, List<Event>>>) o;
+					List<Event> res = new ArrayList<>();
+					for (List<Event> le : el.getValue().values()) {
+						res.addAll(le);
+					}
+					resultingPatterns.add(res);
 				}
-				resultingPatterns.add(res);
 			}
-		}
 
-		compareMaps(resultingPatterns,
+			compareMaps(resultingPatterns,
 				Lists.<List<Event>>newArrayList(
-						Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4),
-						Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
-						Lists.newArrayList(startEvent1, endEvent, middleEvent3, middleEvent1),
-						Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
-						Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent1),
-						Lists.newArrayList(startEvent1, endEvent, middleEvent1),
-						Lists.newArrayList(startEvent2, endEvent, middleEvent3)
+					Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2,
+						middleEvent4),
+					Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
+					Lists.newArrayList(startEvent1, endEvent, middleEvent3, middleEvent1),
+					Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
+					Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent1),
+					Lists.newArrayList(startEvent1, endEvent, middleEvent1),
+					Lists.newArrayList(startEvent2, endEvent, middleEvent3)
 				)
-		);
-		harness.close();
+			);
+		} finally {
+			harness.close();
+		}
 	}
 
 	private void verifyWatermark(Object outputObject, long timestamp) {

http://git-wip-us.apache.org/repos/asf/flink/blob/38c45f80/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 45d7215..86be09c 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -79,70 +79,86 @@ public class CEPRescalingTest {
 		assertEquals(1, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, 2, keygroup));
 
 		// now we start the test, we go from parallelism 1 to 2.
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = null;
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness1 = null;
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness2 = null;
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
-			getTestHarness(maxParallelism, 1, 0);
-		harness.open();
+		try {
+			harness = getTestHarness(maxParallelism, 1, 0);
+			harness.open();
 
-		harness.processElement(new StreamRecord<>(startEvent1, 1));						// valid element
-		harness.processElement(new StreamRecord<>(new Event(7, "foobar", 1.0), 2));
+			harness.processElement(
+				new StreamRecord<>(startEvent1, 1));                        // valid element
+			harness.processElement(new StreamRecord<>(new Event(7, "foobar", 1.0), 2));
 
-		harness.processElement(new StreamRecord<>(startEvent2, 3));						// valid element
-		harness.processElement(new StreamRecord<Event>(middleEvent2, 4));				// valid element
+			harness.processElement(
+				new StreamRecord<>(startEvent2, 3));                        // valid element
+			harness.processElement(
+				new StreamRecord<Event>(middleEvent2, 4));                // valid element
 
-		// take a snapshot with some elements in internal sorting queue
-		OperatorStateHandles snapshot = harness.snapshot(0, 0);
-		harness.close();
+			// take a snapshot with some elements in internal sorting queue
+			OperatorStateHandles snapshot = harness.snapshot(0, 0);
+			harness.close();
 
-		// initialize two sub-tasks with the previously snapshotted state to simulate scaling up
+			// initialize two sub-tasks with the previously snapshotted state to simulate scaling up
 
-		// we know that the valid element will go to index 0,
-		// so we initialize the two tasks and we put the rest of
-		// the valid elements for the pattern on task 0.
+			// we know that the valid element will go to index 0,
+			// so we initialize the two tasks and we put the rest of
+			// the valid elements for the pattern on task 0.
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness1 =
-			getTestHarness(maxParallelism, 2, 0);
+			harness1 = getTestHarness(maxParallelism, 2, 0);
 
-		harness1.setup();
-		harness1.initializeState(snapshot);
-		harness1.open();
+			harness1.setup();
+			harness1.initializeState(snapshot);
+			harness1.open();
 
-		// if element timestamps are not correctly checkpointed/restored this will lead to
-		// a pruning time underflow exception in NFA
-		harness1.processWatermark(new Watermark(2));
+			// if element timestamps are not correctly checkpointed/restored this will lead to
+			// a pruning time underflow exception in NFA
+			harness1.processWatermark(new Watermark(2));
 
-		harness1.processElement(new StreamRecord<Event>(middleEvent1, 3));				// valid element
-		harness1.processElement(new StreamRecord<>(endEvent1, 5));						// valid element
+			harness1.processElement(
+				new StreamRecord<Event>(middleEvent1, 3));                // valid element
+			harness1.processElement(
+				new StreamRecord<>(endEvent1, 5));                        // valid element
 
-		harness1.processWatermark(new Watermark(Long.MAX_VALUE));
+			harness1.processWatermark(new Watermark(Long.MAX_VALUE));
 
-		// watermarks and the result
-		assertEquals(3, harness1.getOutput().size());
-		verifyWatermark(harness1.getOutput().poll(), 2);
-		verifyPattern(harness1.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
+			// watermarks and the result
+			assertEquals(3, harness1.getOutput().size());
+			verifyWatermark(harness1.getOutput().poll(), 2);
+			verifyPattern(harness1.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness2 =
-			getTestHarness(maxParallelism, 2, 1);
+			harness2 = getTestHarness(maxParallelism, 2, 1);
 
-		harness2.setup();
-		harness2.initializeState(snapshot);
-		harness2.open();
+			harness2.setup();
+			harness2.initializeState(snapshot);
+			harness2.open();
 
-		// now we move to the second parallel task
-		harness2.processWatermark(new Watermark(2));
+			// now we move to the second parallel task
+			harness2.processWatermark(new Watermark(2));
 
-		harness2.processElement(new StreamRecord<>(endEvent2, 5));
-		harness2.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+			harness2.processElement(new StreamRecord<>(endEvent2, 5));
+			harness2.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
 
-		harness2.processWatermark(new Watermark(Long.MAX_VALUE));
+			harness2.processWatermark(new Watermark(Long.MAX_VALUE));
 
-		assertEquals(3, harness2.getOutput().size());
-		verifyWatermark(harness2.getOutput().poll(), 2);
-		verifyPattern(harness2.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
+			assertEquals(3, harness2.getOutput().size());
+			verifyWatermark(harness2.getOutput().poll(), 2);
+			verifyPattern(harness2.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
+		} finally {
+			closeSilently(harness);
+			closeSilently(harness1);
+			closeSilently(harness2);
+		}
+	}
 
-		harness.close();
-		harness1.close();
-		harness2.close();
+	private static void closeSilently(OneInputStreamOperatorTestHarness<?, ?> harness) {
+		if (harness != null) {
+			try {
+				harness.close();
+			} catch (Throwable ignored) {
+			}
+		}
 	}
 
 	@Test
@@ -211,109 +227,120 @@ public class CEPRescalingTest {
 			getTestHarness(maxParallelism, 3, 2);
 		harness3.open();
 
-		harness1.processWatermark(Long.MIN_VALUE);
-		harness2.processWatermark(Long.MIN_VALUE);
-		harness3.processWatermark(Long.MIN_VALUE);
-
-		harness1.processElement(new StreamRecord<>(startEvent1, 1));						// valid element
-		harness1.processElement(new StreamRecord<>(new Event(7, "foobar", 1.0), 2));
-		harness1.processElement(new StreamRecord<Event>(middleEvent1, 3));					// valid element
-		harness1.processElement(new StreamRecord<>(endEvent1, 5));							// valid element
-
-		// till here we have a valid sequence, so after creating the
-		// new instance and sending it a watermark, we expect it to fire,
-		// even with no new elements.
-
-		harness1.processElement(new StreamRecord<>(startEvent3, 10));
-		harness1.processElement(new StreamRecord<>(startEvent1, 10));
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness4 = null;
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness5 = null;
 
-		harness2.processElement(new StreamRecord<>(startEvent2, 7));
-		harness2.processElement(new StreamRecord<Event>(middleEvent2, 8));
+		try {
+			harness1.processWatermark(Long.MIN_VALUE);
+			harness2.processWatermark(Long.MIN_VALUE);
+			harness3.processWatermark(Long.MIN_VALUE);
 
-		harness3.processElement(new StreamRecord<>(startEvent4, 15));
-		harness3.processElement(new StreamRecord<Event>(middleEvent4, 16));
-		harness3.processElement(new StreamRecord<>(endEvent4, 17));
+			harness1.processElement(
+				new StreamRecord<>(startEvent1, 1));                        // valid element
+			harness1.processElement(new StreamRecord<>(new Event(7, "foobar", 1.0), 2));
+			harness1.processElement(
+				new StreamRecord<Event>(middleEvent1, 3));                    // valid element
+			harness1.processElement(
+				new StreamRecord<>(endEvent1, 5));                            // valid element
 
-		// so far we only have the initial watermark
-		assertEquals(1, harness1.getOutput().size());
-		verifyWatermark(harness1.getOutput().poll(), Long.MIN_VALUE);
+			// till here we have a valid sequence, so after creating the
+			// new instance and sending it a watermark, we expect it to fire,
+			// even with no new elements.
 
-		assertEquals(1, harness2.getOutput().size());
-		verifyWatermark(harness2.getOutput().poll(), Long.MIN_VALUE);
+			harness1.processElement(new StreamRecord<>(startEvent3, 10));
+			harness1.processElement(new StreamRecord<>(startEvent1, 10));
 
-		assertEquals(1, harness3.getOutput().size());
-		verifyWatermark(harness3.getOutput().poll(), Long.MIN_VALUE);
+			harness2.processElement(new StreamRecord<>(startEvent2, 7));
+			harness2.processElement(new StreamRecord<Event>(middleEvent2, 8));
 
-		// we take a snapshot and make it look as a single operator
-		// this will be the initial state of all downstream tasks.
-		OperatorStateHandles snapshot = AbstractStreamOperatorTestHarness.repackageState(
-			harness2.snapshot(0, 0),
-			harness1.snapshot(0, 0),
-			harness3.snapshot(0, 0)
-		);
+			harness3.processElement(new StreamRecord<>(startEvent4, 15));
+			harness3.processElement(new StreamRecord<Event>(middleEvent4, 16));
+			harness3.processElement(new StreamRecord<>(endEvent4, 17));
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness4 =
-			getTestHarness(maxParallelism, 2, 0);
-		harness4.setup();
-		harness4.initializeState(snapshot);
-		harness4.open();
+			// so far we only have the initial watermark
+			assertEquals(1, harness1.getOutput().size());
+			verifyWatermark(harness1.getOutput().poll(), Long.MIN_VALUE);
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness5 =
-			getTestHarness(maxParallelism, 2, 1);
-		harness5.setup();
-		harness5.initializeState(snapshot);
-		harness5.open();
+			assertEquals(1, harness2.getOutput().size());
+			verifyWatermark(harness2.getOutput().poll(), Long.MIN_VALUE);
 
-		harness5.processElement(new StreamRecord<>(endEvent2, 11));
-		harness5.processWatermark(new Watermark(12));
+			assertEquals(1, harness3.getOutput().size());
+			verifyWatermark(harness3.getOutput().poll(), Long.MIN_VALUE);
 
-		verifyPattern(harness5.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
-		verifyWatermark(harness5.getOutput().poll(), 12);
+			// we take a snapshot and make it look as a single operator
+			// this will be the initial state of all downstream tasks.
+			OperatorStateHandles snapshot = AbstractStreamOperatorTestHarness.repackageState(
+				harness2.snapshot(0, 0),
+				harness1.snapshot(0, 0),
+				harness3.snapshot(0, 0)
+			);
 
-		// if element timestamps are not correctly checkpointed/restored this will lead to
-		// a pruning time underflow exception in NFA
-		harness4.processWatermark(new Watermark(12));
+			harness4 = getTestHarness(maxParallelism, 2, 0);
+			harness4.setup();
+			harness4.initializeState(snapshot);
+			harness4.open();
 
-		assertEquals(2, harness4.getOutput().size());
-		verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
-		verifyWatermark(harness4.getOutput().poll(), 12);
+			harness5 = getTestHarness(maxParallelism, 2, 1);
+			harness5.setup();
+			harness5.initializeState(snapshot);
+			harness5.open();
 
-		harness4.processElement(new StreamRecord<Event>(middleEvent3, 15));			// valid element
-		harness4.processElement(new StreamRecord<>(endEvent3, 16));					// valid element
+			harness5.processElement(new StreamRecord<>(endEvent2, 11));
+			harness5.processWatermark(new Watermark(12));
 
-		harness4.processElement(new StreamRecord<Event>(middleEvent1, 15));			// valid element
-		harness4.processElement(new StreamRecord<>(endEvent1, 16));					// valid element
+			verifyPattern(harness5.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
+			verifyWatermark(harness5.getOutput().poll(), 12);
 
-		harness4.processWatermark(new Watermark(Long.MAX_VALUE));
-		harness5.processWatermark(new Watermark(Long.MAX_VALUE));
+			// if element timestamps are not correctly checkpointed/restored this will lead to
+			// a pruning time underflow exception in NFA
+			harness4.processWatermark(new Watermark(12));
 
-		// verify result
-		assertEquals(3, harness4.getOutput().size());
-
-		// check the order of the events in the output
-		Queue<Object> output = harness4.getOutput();
-		StreamRecord<?> resultRecord = (StreamRecord<?>) output.peek();
-		assertTrue(resultRecord.getValue() instanceof Map);
-
-		@SuppressWarnings("unchecked")
-		Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue();
-		if (patternMap.get("start").get(0).getId() == 7) {
+			assertEquals(2, harness4.getOutput().size());
 			verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
-			verifyPattern(harness4.getOutput().poll(), startEvent3, middleEvent3, endEvent3);
-		} else {
-			verifyPattern(harness4.getOutput().poll(), startEvent3, middleEvent3, endEvent3);
-			verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
-		}
-
-		// after scaling down this should end up here
-		assertEquals(2, harness5.getOutput().size());
-		verifyPattern(harness5.getOutput().poll(), startEvent4, middleEvent4, endEvent4);
+			verifyWatermark(harness4.getOutput().poll(), 12);
+
+			harness4.processElement(
+				new StreamRecord<Event>(middleEvent3, 15));            // valid element
+			harness4.processElement(
+				new StreamRecord<>(endEvent3, 16));                    // valid element
+
+			harness4.processElement(
+				new StreamRecord<Event>(middleEvent1, 15));            // valid element
+			harness4.processElement(
+				new StreamRecord<>(endEvent1, 16));                    // valid element
+
+			harness4.processWatermark(new Watermark(Long.MAX_VALUE));
+			harness5.processWatermark(new Watermark(Long.MAX_VALUE));
+
+			// verify result
+			assertEquals(3, harness4.getOutput().size());
+
+			// check the order of the events in the output
+			Queue<Object> output = harness4.getOutput();
+			StreamRecord<?> resultRecord = (StreamRecord<?>) output.peek();
+			assertTrue(resultRecord.getValue() instanceof Map);
+
+			@SuppressWarnings("unchecked")
+			Map<String, List<Event>> patternMap =
+				(Map<String, List<Event>>) resultRecord.getValue();
+			if (patternMap.get("start").get(0).getId() == 7) {
+				verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
+				verifyPattern(harness4.getOutput().poll(), startEvent3, middleEvent3, endEvent3);
+			} else {
+				verifyPattern(harness4.getOutput().poll(), startEvent3, middleEvent3, endEvent3);
+				verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
+			}
 
-		harness1.close();
-		harness2.close();
-		harness3.close();
-		harness4.close();
-		harness5.close();
+			// after scaling down this should end up here
+			assertEquals(2, harness5.getOutput().size());
+			verifyPattern(harness5.getOutput().poll(), startEvent4, middleEvent4, endEvent4);
+		} finally {
+			closeSilently(harness1);
+			closeSilently(harness2);
+			closeSilently(harness3);
+			closeSilently(harness4);
+			closeSilently(harness5);
+		}
 	}
 
 	private void verifyWatermark(Object outputObject, long timestamp) {


Mime
View raw message