flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [1/9] flink git commit: [FLINK-6371] [cep] NFA return matched patterns as Map<String, List<T>>.
Date Wed, 17 May 2017 12:42:41 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 fe1316b33 -> 849dd9d85


http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 2cc67e5..46e2fd4 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.nfa;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import com.google.common.primitives.Doubles;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.Event;
@@ -156,22 +155,11 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		List<Map<String, Event>> resultingPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent: inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			resultingPatterns.addAll(patterns);
-		}
-
-		assertEquals(1, resultingPatterns.size());
-		Map<String, Event> patternMap = resultingPatterns.get(0);
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(startEvent, patternMap.get("start"));
-		assertEquals(middleEvent, patternMap.get("middle"));
-		assertEquals(endEvent, patternMap.get("end"));
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent, middleEvent, endEvent)
+		));
 	}
 
 	@Test
@@ -202,24 +190,11 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(1, allPatterns.size());
-		assertEquals(Sets.<Set<Event>>newHashSet(
-			Sets.newHashSet(middleEvent1, end)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(middleEvent1, end)
+		));
 	}
 
 	@Test
@@ -252,19 +227,9 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-			}
-		}
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(Sets.newHashSet(), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList());
 	}
 
 	/**
@@ -274,7 +239,6 @@ public class NFAITCase extends TestLogger {
 	@Test
 	public void testSimplePatternWithTimeWindowNFA() {
 		List<StreamRecord<Event>> events = new ArrayList<>();
-		List<Map<String, Event>> resultingPatterns = new ArrayList<>();
 
 		final Event startEvent;
 		final Event middleEvent;
@@ -313,21 +277,11 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		for (StreamRecord<Event> event: events) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-					event.getValue(),
-					event.getTimestamp()).f0;
-
-			resultingPatterns.addAll(patterns);
-		}
-
-		assertEquals(1, resultingPatterns.size());
-
-		Map<String, Event> patternMap = resultingPatterns.get(0);
+		List<List<Event>> resultingPatterns = feedNFA(events, nfa);
 
-		assertEquals(startEvent, patternMap.get("start"));
-		assertEquals(middleEvent, patternMap.get("middle"));
-		assertEquals(endEvent, patternMap.get("end"));
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent, middleEvent, endEvent)
+		));
 	}
 
 	/**
@@ -337,9 +291,9 @@ public class NFAITCase extends TestLogger {
 	@Test
 	public void testSimplePatternWithTimeoutHandling() {
 		List<StreamRecord<Event>> events = new ArrayList<>();
-		List<Map<String, Event>> resultingPatterns = new ArrayList<>();
-		Set<Tuple2<Map<String, Event>, Long>> resultingTimeoutPatterns = new HashSet<>();
-		Set<Tuple2<Map<String, Event>, Long>> expectedTimeoutPatterns = new HashSet<>();
+		List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
+		Set<Tuple2<Map<String, List<Event>>, Long>> resultingTimeoutPatterns = new HashSet<>();
+		Set<Tuple2<Map<String, List<Event>>, Long>> expectedTimeoutPatterns = new HashSet<>();
 
 		events.add(new StreamRecord<>(new Event(1, "start", 1.0), 1));
 		events.add(new StreamRecord<>(new Event(2, "start", 1.0), 2));
@@ -348,19 +302,19 @@ public class NFAITCase extends TestLogger {
 		events.add(new StreamRecord<>(new Event(5, "end", 1.0), 11));
 		events.add(new StreamRecord<>(new Event(6, "end", 1.0), 13));
 
-		Map<String, Event> timeoutPattern1 = new HashMap<>();
-		timeoutPattern1.put("start", new Event(1, "start", 1.0));
-		timeoutPattern1.put("middle", new Event(3, "middle", 1.0));
+		Map<String, List<Event>> timeoutPattern1 = new HashMap<>();
+		timeoutPattern1.put("start", Collections.singletonList(new Event(1, "start", 1.0)));
+		timeoutPattern1.put("middle", Collections.singletonList(new Event(3, "middle", 1.0)));
 
-		Map<String, Event> timeoutPattern2 = new HashMap<>();
-		timeoutPattern2.put("start", new Event(2, "start", 1.0));
-		timeoutPattern2.put("middle", new Event(3, "middle", 1.0));
+		Map<String, List<Event>> timeoutPattern2 = new HashMap<>();
+		timeoutPattern2.put("start", Collections.singletonList(new Event(2, "start", 1.0)));
+		timeoutPattern2.put("middle", Collections.singletonList(new Event(3, "middle", 1.0)));
 
-		Map<String, Event> timeoutPattern3 = new HashMap<>();
-		timeoutPattern3.put("start", new Event(1, "start", 1.0));
+		Map<String, List<Event>> timeoutPattern3 = new HashMap<>();
+		timeoutPattern3.put("start", Collections.singletonList(new Event(1, "start", 1.0)));
 
-		Map<String, Event> timeoutPattern4 = new HashMap<>();
-		timeoutPattern4.put("start", new Event(2, "start", 1.0));
+		Map<String, List<Event>> timeoutPattern4 = new HashMap<>();
+		timeoutPattern4.put("start", Collections.singletonList(new Event(2, "start", 1.0)));
 
 		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern1, 11L));
 		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern2, 13L));
@@ -393,10 +347,11 @@ public class NFAITCase extends TestLogger {
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), true);
 
 		for (StreamRecord<Event> event: events) {
-			final Tuple2<Collection<Map<String, Event>>, Collection<Tuple2<Map<String, Event>, Long>>> patterns = nfa.process(event.getValue(), event.getTimestamp());
+			Tuple2<Collection<Map<String, List<Event>>>, Collection<Tuple2<Map<String, List<Event>>, Long>>> patterns =
+					nfa.process(event.getValue(), event.getTimestamp());
 
-			Collection<Map<String, Event>> matchedPatterns = patterns.f0;
-			Collection<Tuple2<Map<String, Event>, Long>> timeoutPatterns = patterns.f1;
+			Collection<Map<String, List<Event>>> matchedPatterns = patterns.f0;
+			Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutPatterns = patterns.f1;
 
 			resultingPatterns.addAll(matchedPatterns);
 			resultingTimeoutPatterns.addAll(timeoutPatterns);
@@ -460,31 +415,16 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		List<Map<String, Event>> resultingPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent: inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			resultingPatterns.addAll(patterns);
-		}
-
-		assertEquals(6, resultingPatterns.size());
-
-		final Set<Set<Event>> patterns = new HashSet<>();
-		for (Map<String, Event> resultingPattern : resultingPatterns) {
-			patterns.add(new HashSet<>(resultingPattern.values()));
-		}
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(startEvent, middleEvent1, nextOne1, endEvent),
-			Sets.newHashSet(startEvent, middleEvent2, nextOne1, endEvent),
-			Sets.newHashSet(startEvent, middleEvent3, nextOne1, endEvent),
-			Sets.newHashSet(startEvent, middleEvent1, nextOne2, endEvent),
-			Sets.newHashSet(startEvent, middleEvent2, nextOne2, endEvent),
-			Sets.newHashSet(startEvent, middleEvent3, nextOne2, endEvent)
-		), patterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent, middleEvent1, nextOne1, endEvent),
+				Lists.newArrayList(startEvent, middleEvent2, nextOne1, endEvent),
+				Lists.newArrayList(startEvent, middleEvent3, nextOne1, endEvent),
+				Lists.newArrayList(startEvent, middleEvent1, nextOne2, endEvent),
+				Lists.newArrayList(startEvent, middleEvent2, nextOne2, endEvent),
+				Lists.newArrayList(startEvent, middleEvent3, nextOne2, endEvent)
+		));
 	}
 
 	@Test
@@ -548,39 +488,26 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(16, allPatterns.size());
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end2, end4),
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1, end2, end4),
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1, end2, end4),
-			Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end1, end2, end4),
-			Sets.newHashSet(startEvent, middleEvent1, end1, end2, end4),
-			Sets.newHashSet(startEvent, middleEvent2, end1, end2, end4),
-			Sets.newHashSet(startEvent, middleEvent3, end1, end2, end4),
-			Sets.newHashSet(startEvent, end1, end2, end4),
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end3, end4),
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1, end3, end4),
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1, end3, end4),
-			Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end1, end3, end4),
-			Sets.newHashSet(startEvent, middleEvent1, end1, end3, end4),
-			Sets.newHashSet(startEvent, middleEvent2, end1, end3, end4),
-			Sets.newHashSet(startEvent, middleEvent3, end1, end3, end4),
-			Sets.newHashSet(startEvent, end1, end3, end4)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end2, end4),
+				Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1, end2, end4),
+				Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1, end2, end4),
+				Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end1, end2, end4),
+				Lists.newArrayList(startEvent, middleEvent1, end1, end2, end4),
+				Lists.newArrayList(startEvent, middleEvent2, end1, end2, end4),
+				Lists.newArrayList(startEvent, middleEvent3, end1, end2, end4),
+				Lists.newArrayList(startEvent, end1, end2, end4),
+				Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end3, end4),
+				Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1, end3, end4),
+				Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1, end3, end4),
+				Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end1, end3, end4),
+				Lists.newArrayList(startEvent, middleEvent1, end1, end3, end4),
+				Lists.newArrayList(startEvent, middleEvent2, end1, end3, end4),
+				Lists.newArrayList(startEvent, middleEvent3, end1, end3, end4),
+				Lists.newArrayList(startEvent, end1, end3, end4)
+		));
 	}
 
 	@Test
@@ -674,27 +601,14 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(4, allPatterns.size());
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
-			Sets.newHashSet(startEvent, middleEvent1, end1),
-			Sets.newHashSet(startEvent, end1)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
+				Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
+				Lists.newArrayList(startEvent, middleEvent1, end1),
+				Lists.newArrayList(startEvent, end1)
+		));
 	}
 
 	@Test
@@ -729,30 +643,17 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(7, allPatterns.size());
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(middleEvent1, middleEvent2, middleEvent3, end),
-			Sets.newHashSet(middleEvent1, middleEvent2, end),
-			Sets.newHashSet(middleEvent2, middleEvent3, end),
-			Sets.newHashSet(middleEvent1, end),
-			Sets.newHashSet(middleEvent2, end),
-			Sets.newHashSet(middleEvent3, end),
-			Sets.newHashSet(end)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(middleEvent1, middleEvent2, middleEvent3, end),
+				Lists.newArrayList(middleEvent1, middleEvent2, end),
+				Lists.newArrayList(middleEvent2, middleEvent3, end),
+				Lists.newArrayList(middleEvent1, end),
+				Lists.newArrayList(middleEvent2, end),
+				Lists.newArrayList(middleEvent3, end),
+				Lists.newArrayList(end)
+		));
 	}
 
 	@Test
@@ -805,29 +706,16 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(6, allPatterns.size());
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end),
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end),
-			Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end),
-			Sets.newHashSet(startEvent, middleEvent2, end),
-			Sets.newHashSet(startEvent, middleEvent1, end),
-			Sets.newHashSet(startEvent, end)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end),
+				Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end),
+				Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end),
+				Lists.newArrayList(startEvent, middleEvent2, end),
+				Lists.newArrayList(startEvent, middleEvent1, end),
+				Lists.newArrayList(startEvent, end)
+		));
 	}
 
 	@Test
@@ -889,31 +777,18 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(8, allPatterns.size());
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(startEvent, middleEvent1, merging, end),
-			Sets.newHashSet(startEvent, middleEvent1, merging, kleene1, end),
-			Sets.newHashSet(startEvent, middleEvent1, merging, kleene2, end),
-			Sets.newHashSet(startEvent, middleEvent1, merging, kleene1, kleene2, end),
-			Sets.newHashSet(startEvent, middleEvent2, merging, end),
-			Sets.newHashSet(startEvent, middleEvent2, merging, kleene1, end),
-			Sets.newHashSet(startEvent, middleEvent2, merging, kleene2, end),
-			Sets.newHashSet(startEvent, middleEvent2, merging, kleene1, kleene2, end)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent, middleEvent1, merging, end),
+				Lists.newArrayList(startEvent, middleEvent1, merging, kleene1, end),
+				Lists.newArrayList(startEvent, middleEvent1, merging, kleene2, end),
+				Lists.newArrayList(startEvent, middleEvent1, merging, kleene1, kleene2, end),
+				Lists.newArrayList(startEvent, middleEvent2, merging, end),
+				Lists.newArrayList(startEvent, middleEvent2, merging, kleene1, end),
+				Lists.newArrayList(startEvent, middleEvent2, merging, kleene2, end),
+				Lists.newArrayList(startEvent, middleEvent2, merging, kleene1, kleene2, end)
+		));
 	}
 
 	@Test
@@ -958,19 +833,9 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-			}
-		}
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(Sets.newHashSet(), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList());
 	}
 
 	@Test
@@ -1059,26 +924,13 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(3, allPatterns.size());
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
-			Sets.newHashSet(startEvent, middleEvent1, end1),
-			Sets.newHashSet(startEvent, middleEvent2, end1)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
+				Lists.newArrayList(startEvent, middleEvent1, end1),
+				Lists.newArrayList(startEvent, middleEvent2, end1)
+		));
 	}
 
 	@Test
@@ -1113,30 +965,17 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(7, allPatterns.size());
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(startEvent1, startEvent2, startEvent3, end1),
-			Sets.newHashSet(startEvent1, startEvent2, end1),
-			Sets.newHashSet(startEvent1, startEvent3, end1),
-			Sets.newHashSet(startEvent2, startEvent3, end1),
-			Sets.newHashSet(startEvent1, end1),
-			Sets.newHashSet(startEvent2, end1),
-			Sets.newHashSet(startEvent3, end1)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent1, startEvent2, startEvent3, end1),
+				Lists.newArrayList(startEvent1, startEvent2, end1),
+				Lists.newArrayList(startEvent1, startEvent3, end1),
+				Lists.newArrayList(startEvent2, startEvent3, end1),
+				Lists.newArrayList(startEvent1, end1),
+				Lists.newArrayList(startEvent2, end1),
+				Lists.newArrayList(startEvent3, end1)
+		));
 	}
 
 	@Test
@@ -1181,24 +1020,11 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(1, allPatterns.size());
-		assertEquals(Sets.<Set<Event>>newHashSet(
-			Sets.newHashSet(startEvent, endEvent)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent, endEvent)
+		));
 	}
 
 	@Test
@@ -1291,25 +1117,12 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(2, allPatterns.size());
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(startEvent, middleEvent, end1),
-			Sets.newHashSet(startEvent, end1)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent, middleEvent, end1),
+				Lists.newArrayList(startEvent, end1)
+		));
 	}
 
 	@Test
@@ -1602,25 +1415,12 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(2, allPatterns.size());
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(startEvent,  end1),
-			Sets.newHashSet(end1)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent,  end1),
+				Lists.newArrayList(end1)
+		));
 	}
 
 	@Test
@@ -1655,27 +1455,14 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(4, allPatterns.size());
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(startEvent,  middleEvent1, middleEvent2, middleEvent3),
-			Sets.newHashSet(startEvent,  middleEvent1, middleEvent2),
-			Sets.newHashSet(startEvent,  middleEvent1),
-			Sets.newHashSet(startEvent)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent,  middleEvent1, middleEvent2, middleEvent3),
+				Lists.newArrayList(startEvent,  middleEvent1, middleEvent2),
+				Lists.newArrayList(startEvent,  middleEvent1),
+				Lists.newArrayList(startEvent)
+		));
 	}
 
 	@Test
@@ -1749,25 +1536,12 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(2, allPatterns.size());
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(startEvent,  middleEvent1),
-			Sets.newHashSet(startEvent)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent,  middleEvent1),
+				Lists.newArrayList(startEvent)
+		));
 	}
 
 	@Test
@@ -1972,7 +1746,7 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).times(2).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() { // TODO: 4/4/17 also check order consecutive() vs optional()
+		}).times(2).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -2063,7 +1837,7 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).times(2).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() { // TODO: 4/4/17 also check order consecutive() vs optional()
+		}).times(2).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -2108,7 +1882,7 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).times(2).allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() { // TODO: 4/4/17 also check order consecutive() vs optional()
+		}).times(2).allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -3165,26 +2939,28 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(d, 5));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 5167288560432018992L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
 		}).notNext("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 2242479288129905510L;
+
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("b");
 			}
 		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 1404509325548220892L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
 		}).followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = -8907427230007830915L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -3219,26 +2995,28 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(d, 5));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = -339500190577666439L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
 		}).notNext("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -6913980632538046451L;
+
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("b");
 			}
 		}).followedBy("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 3332196998905139891L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
 		}).followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 2086563479959018387L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -3270,27 +3048,29 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(b1, 5));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 1672995058886176627L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
 		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 6003621617520261554L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
 		}).followedByAny("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 887700237024758417L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("d");
 			}
 		}).notNext("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5239529076086933032L;
+
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("b");
@@ -3321,26 +3101,28 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(d, 5));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = -2641662468313191976L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
 		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -3632144132379494778L;
+
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("b");
 			}
 		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 3818766882138348167L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
 		}).followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 2033204730795451288L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -3374,26 +3156,28 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(d, 5));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = -2454396370205097543L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
 		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 2749547391611263290L;
+
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("b");
 			}
 		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = -4989511337298217255L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
 		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = -8466223836652936608L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -3427,26 +3211,28 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(d, 5));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = -2568839911852184515L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
 		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = -3632232424064269636L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("b");
 			}
 		}).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 3685596793523534611L;
+
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
 		}).followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 1960758663575587243L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -3482,26 +3268,28 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(d2, 5));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 2814850350025111940L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
 		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 4988756153568853834L;
+
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("b");
 			}
 		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = -225909103322018778L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
 		}).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = -924294627956373696L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -3539,26 +3327,28 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(d2, 5));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 6193105689601702341L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
 		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 5195859580923169111L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("b");
 			}
 		}).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 4973027956103783831L;
+
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
 		}).followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 2724622546678984894L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -3588,19 +3378,21 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(c2, 4));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = -4289351792573443294L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
 		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -4989574608417523507L;
+
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("b");
 			}
 		}).followedByAny("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = -5940131818629290579L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -3635,26 +3427,28 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(d, 5));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = -7885381452276160322L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
 		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 3471511260235826653L;
+
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("b");
 			}
 		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 9073793782452363833L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
 		}).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 7972902718259767076L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -3690,26 +3484,28 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(d, 6));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = -7866220136345465444L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
 		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 4957837489028234932L;
+
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("b");
 			}
 		}).followedBy("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = 5569569968862808007L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
 		}).followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
+			private static final long serialVersionUID = -8579678167937416269L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -4125,12 +3921,16 @@ public class NFAITCase extends TestLogger {
 		List<List<Event>> resultingPatterns = new ArrayList<>();
 
 		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
+			Collection<Map<String, List<Event>>> patterns = nfa.process(
 				inputEvent.getValue(),
 				inputEvent.getTimestamp()).f0;
 
-			for (Map<String, Event> p: patterns) {
-				resultingPatterns.add(new ArrayList<>(p.values()));
+			for (Map<String, List<Event>> p: patterns) {
+				List<Event> res = new ArrayList<>();
+				for (List<Event> le: p.values()) {
+					res.addAll(le);
+				}
+				resultingPatterns.add(res);
 			}
 		}
 		return resultingPatterns;

http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index d2e392b..11d193a 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -82,20 +82,20 @@ public class NFATest extends TestLogger {
 		nfa.addState(endState);
 		nfa.addState(endingState);
 
-		Set<Map<String, Event>> expectedPatterns = new HashSet<>();
+		Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
 
-		Map<String, Event> firstPattern = new HashMap<>();
-		firstPattern.put("start", new Event(1, "start", 1.0));
-		firstPattern.put("end", new Event(4, "end", 4.0));
+		Map<String, List<Event>> firstPattern = new HashMap<>();
+		firstPattern.put("start", Collections.singletonList(new Event(1, "start", 1.0)));
+		firstPattern.put("end", Collections.singletonList(new Event(4, "end", 4.0)));
 
-		Map<String, Event> secondPattern = new HashMap<>();
-		secondPattern.put("start", new Event(3, "start", 3.0));
-		secondPattern.put("end", new Event(4, "end", 4.0));
+		Map<String, List<Event>> secondPattern = new HashMap<>();
+		secondPattern.put("start", Collections.singletonList(new Event(3, "start", 3.0)));
+		secondPattern.put("end", Collections.singletonList(new Event(4, "end", 4.0)));
 
 		expectedPatterns.add(firstPattern);
 		expectedPatterns.add(secondPattern);
 
-		Collection<Map<String, Event>> actualPatterns = runNFA(nfa, streamEvents);
+		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, streamEvents);
 
 		assertEquals(expectedPatterns, actualPatterns);
 	}
@@ -110,15 +110,15 @@ public class NFATest extends TestLogger {
 		streamEvents.add(new StreamRecord<>(new Event(3, "start", 3.0), 3L));
 		streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 4L));
 
-		Set<Map<String, Event>> expectedPatterns = new HashSet<>();
+		Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
 
-		Map<String, Event> secondPattern = new HashMap<>();
-		secondPattern.put("start", new Event(3, "start", 3.0));
-		secondPattern.put("end", new Event(4, "end", 4.0));
+		Map<String, List<Event>> secondPattern = new HashMap<>();
+		secondPattern.put("start", Collections.singletonList(new Event(3, "start", 3.0)));
+		secondPattern.put("end", Collections.singletonList(new Event(4, "end", 4.0)));
 
 		expectedPatterns.add(secondPattern);
 
-		Collection<Map<String, Event>> actualPatterns = runNFA(nfa, streamEvents);
+		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, streamEvents);
 
 		assertEquals(expectedPatterns, actualPatterns);
 	}
@@ -135,9 +135,9 @@ public class NFATest extends TestLogger {
 		streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L));
 		streamEvents.add(new StreamRecord<>(new Event(2, "end", 2.0), 3L));
 
-		Set<Map<String, Event>> expectedPatterns = Collections.emptySet();
+		Set<Map<String, List<Event>>> expectedPatterns = Collections.emptySet();
 
-		Collection<Map<String, Event>> actualPatterns = runNFA(nfa, streamEvents);
+		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, streamEvents);
 
 		assertEquals(expectedPatterns, actualPatterns);
 	}
@@ -156,40 +156,24 @@ public class NFATest extends TestLogger {
 		streamEvents.add(new StreamRecord<>(new Event(3, "foobar", 3.0), 3L));
 		streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 3L));
 
-		Set<Map<String, Event>> expectedPatterns = new HashSet<>();
+		Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
 
-		Map<String, Event> secondPattern = new HashMap<>();
-		secondPattern.put("start", new Event(2, "start", 2.0));
-		secondPattern.put("end", new Event(4, "end", 4.0));
+		Map<String, List<Event>> secondPattern = new HashMap<>();
+		secondPattern.put("start", Collections.singletonList(new Event(2, "start", 2.0)));
+		secondPattern.put("end", Collections.singletonList(new Event(4, "end", 4.0)));
 
 		expectedPatterns.add(secondPattern);
 
-		Collection<Map<String, Event>> actualPatterns = runNFA(nfa, streamEvents);
+		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, streamEvents);
 
 		assertEquals(expectedPatterns, actualPatterns);
 	}
 
-	@Test
-	public void testStateNameGeneration() {
-		String expectedName1 = "a[2]";
-		String expectedName2 = "a_3";
-		String expectedName3 = "a[][42]";
-
-		String generatedName1 = NFA.generateStateName("a[]", 2);
-		String generatedName2 = NFA.generateStateName("a", 3);
-		String generatedName3 = NFA.generateStateName("a[][]", 42);
-
-
-		assertEquals(expectedName1, generatedName1);
-		assertEquals(expectedName2, generatedName2);
-		assertEquals(expectedName3, generatedName3);
-	}
-
-	public <T> Collection<Map<String, T>> runNFA(NFA<T> nfa, List<StreamRecord<T>> inputs) {
-		Set<Map<String, T>> actualPatterns = new HashSet<>();
+	public <T> Collection<Map<String, List<T>>> runNFA(NFA<T> nfa, List<StreamRecord<T>> inputs) {
+		Set<Map<String, List<T>>> actualPatterns = new HashSet<>();
 
 		for (StreamRecord<T> streamEvent : inputs) {
-			Collection<Map<String, T>> matchedPatterns = nfa.process(
+			Collection<Map<String, List<T>>> matchedPatterns = nfa.process(
 				streamEvent.getValue(),
 				streamEvent.getTimestamp()).f0;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index adc07b3..2da3c31 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.cep.nfa;
 
-import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 import org.apache.flink.cep.Event;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -48,12 +49,12 @@ public class SharedBufferTest extends TestLogger {
 			events[i] = new Event(i + 1, "e" + (i + 1), i);
 		}
 
-		LinkedHashMultimap<String, Event> expectedPattern1 = LinkedHashMultimap.create();
+		ListMultimap<String, Event> expectedPattern1 = ArrayListMultimap.create();
 		expectedPattern1.put("a1", events[2]);
 		expectedPattern1.put("a[]", events[3]);
 		expectedPattern1.put("b", events[5]);
 
-		LinkedHashMultimap<String, Event> expectedPattern2 = LinkedHashMultimap.create();
+		ListMultimap<String, Event> expectedPattern2 = ArrayListMultimap.create();
 		expectedPattern2.put("a1", events[0]);
 		expectedPattern2.put("a[]", events[1]);
 		expectedPattern2.put("a[]", events[2]);
@@ -61,7 +62,7 @@ public class SharedBufferTest extends TestLogger {
 		expectedPattern2.put("a[]", events[4]);
 		expectedPattern2.put("b", events[5]);
 
-		LinkedHashMultimap<String, Event> expectedPattern3 = LinkedHashMultimap.create();
+		ListMultimap<String, Event> expectedPattern3 = ArrayListMultimap.create();
 		expectedPattern3.put("a1", events[0]);
 		expectedPattern3.put("a[]", events[1]);
 		expectedPattern3.put("a[]", events[2]);
@@ -84,11 +85,11 @@ public class SharedBufferTest extends TestLogger {
 		sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, DeweyNumber.fromString("1.1"));
 		sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, DeweyNumber.fromString("1.1.0"));
 
-		Collection<LinkedHashMultimap<String, Event>> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0"));
+		Collection<ListMultimap<String, Event>> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0"));
 		sharedBuffer.release("b", events[7], timestamp);
-		Collection<LinkedHashMultimap<String, Event>> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0"));
-		Collection<LinkedHashMultimap<String, Event>> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("2.0.0"));
-		Collection<LinkedHashMultimap<String, Event>> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("1.0.0"));
+		Collection<ListMultimap<String, Event>> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0"));
+		Collection<ListMultimap<String, Event>> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("2.0.0"));
+		Collection<ListMultimap<String, Event>> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("1.0.0"));
 		sharedBuffer.release("b", events[5], timestamp);
 
 		assertEquals(1L, patterns3.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index 90a6321..26b8ce9 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.cep.nfa.compiler;
 
-import com.google.common.collect.Sets;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;

http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
index 2f7cdeb..afb3e7c 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.util.OperatorSnapshotUtil;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -72,7 +73,7 @@ public class CEPFrom12MigrationTest {
 		final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
 		final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
 				new KeyedOneInputStreamOperatorTestHarness<>(
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
@@ -120,7 +121,7 @@ public class CEPFrom12MigrationTest {
 		final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
 		final Event endEvent = new Event(42, "end", 1.0);
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
 				new KeyedOneInputStreamOperatorTestHarness<>(
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
@@ -160,18 +161,18 @@ public class CEPFrom12MigrationTest {
 		assertTrue(resultRecord2.getValue() instanceof Map);
 
 		@SuppressWarnings("unchecked")
-		Map<String, Event> patternMap1 = (Map<String, Event>) resultRecord1.getValue();
+		Map<String, List<Event>> patternMap1 = (Map<String, List<Event>>) resultRecord1.getValue();
 
-		assertEquals(startEvent, patternMap1.get("start"));
-		assertEquals(middleEvent1, patternMap1.get("middle"));
-		assertEquals(endEvent, patternMap1.get("end"));
+		assertEquals(startEvent, patternMap1.get("start").get(0));
+		assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+		assertEquals(endEvent, patternMap1.get("end").get(0));
 
 		@SuppressWarnings("unchecked")
-		Map<String, Event> patternMap2 = (Map<String, Event>) resultRecord2.getValue();
+		Map<String, List<Event>> patternMap2 = (Map<String, List<Event>>) resultRecord2.getValue();
 
-		assertEquals(startEvent, patternMap2.get("start"));
-		assertEquals(middleEvent2, patternMap2.get("middle"));
-		assertEquals(endEvent, patternMap2.get("end"));
+		assertEquals(startEvent, patternMap2.get("start").get(0));
+		assertEquals(middleEvent2, patternMap2.get("middle").get(0));
+		assertEquals(endEvent, patternMap2.get("end").get(0));
 
 		harness.close();
 	}
@@ -195,7 +196,7 @@ public class CEPFrom12MigrationTest {
 		final Event startEvent1 = new Event(42, "start", 1.0);
 		final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
 				new KeyedOneInputStreamOperatorTestHarness<>(
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
@@ -241,7 +242,7 @@ public class CEPFrom12MigrationTest {
 		final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
 		final Event endEvent = new Event(42, "end", 1.0);
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
 				new KeyedOneInputStreamOperatorTestHarness<>(
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
@@ -287,25 +288,25 @@ public class CEPFrom12MigrationTest {
 		assertTrue(resultRecord3.getValue() instanceof Map);
 
 		@SuppressWarnings("unchecked")
-		Map<String, Event> patternMap1 = (Map<String, Event>) resultRecord1.getValue();
+		Map<String, List<Event>> patternMap1 = (Map<String, List<Event>>) resultRecord1.getValue();
 
-		assertEquals(startEvent1, patternMap1.get("start"));
-		assertEquals(middleEvent1, patternMap1.get("middle"));
-		assertEquals(endEvent, patternMap1.get("end"));
+		assertEquals(startEvent1, patternMap1.get("start").get(0));
+		assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+		assertEquals(endEvent, patternMap1.get("end").get(0));
 
 		@SuppressWarnings("unchecked")
-		Map<String, Event> patternMap2 = (Map<String, Event>) resultRecord2.getValue();
+		Map<String, List<Event>> patternMap2 = (Map<String, List<Event>>) resultRecord2.getValue();
 
-		assertEquals(startEvent1, patternMap2.get("start"));
-		assertEquals(middleEvent2, patternMap2.get("middle"));
-		assertEquals(endEvent, patternMap2.get("end"));
+		assertEquals(startEvent1, patternMap2.get("start").get(0));
+		assertEquals(middleEvent2, patternMap2.get("middle").get(0));
+		assertEquals(endEvent, patternMap2.get("end").get(0));
 
 		@SuppressWarnings("unchecked")
-		Map<String, Event> patternMap3 = (Map<String, Event>) resultRecord3.getValue();
+		Map<String, List<Event>> patternMap3 = (Map<String, List<Event>>) resultRecord3.getValue();
 
-		assertEquals(startEvent2, patternMap3.get("start"));
-		assertEquals(middleEvent2, patternMap3.get("middle"));
-		assertEquals(endEvent, patternMap3.get("end"));
+		assertEquals(startEvent2, patternMap3.get("start").get(0));
+		assertEquals(middleEvent2, patternMap3.get("middle").get(0));
+		assertEquals(endEvent, patternMap3.get("end").get(0));
 
 		harness.close();
 	}
@@ -328,7 +329,7 @@ public class CEPFrom12MigrationTest {
 
 		final Event startEvent1 = new Event(42, "start", 1.0);
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
 				new KeyedOneInputStreamOperatorTestHarness<>(
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
@@ -367,7 +368,7 @@ public class CEPFrom12MigrationTest {
 
 		final Event startEvent1 = new Event(42, "start", 1.0);
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
 				new KeyedOneInputStreamOperatorTestHarness<>(
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
@@ -401,9 +402,9 @@ public class CEPFrom12MigrationTest {
 		assertTrue(resultRecord.getValue() instanceof Map);
 
 		@SuppressWarnings("unchecked")
-		Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
+		Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue();
 
-		assertEquals(startEvent1, patternMap.get("start"));
+		assertEquals(startEvent1, patternMap.get("start").get(0));
 
 		harness.close();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 4e05fcf..404de54 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.junit.Test;
 
 import java.net.URL;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -95,7 +96,7 @@ public class CEPMigration11to13Test {
 		harness.close();
 		*/
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
 				new KeyedOneInputStreamOperatorTestHarness<>(
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
@@ -129,11 +130,11 @@ public class CEPMigration11to13Test {
 		assertTrue(resultRecord.getValue() instanceof Map);
 
 		@SuppressWarnings("unchecked")
-		Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
+		Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue();
 
-		assertEquals(startEvent, patternMap.get("start"));
-		assertEquals(middleEvent, patternMap.get("middle"));
-		assertEquals(endEvent, patternMap.get("end"));
+		assertEquals(startEvent, patternMap.get("start").get(0));
+		assertEquals(middleEvent, patternMap.get("middle").get(0));
+		assertEquals(endEvent, patternMap.get("end").get(0));
 
 		harness.close();
 	}
@@ -170,7 +171,7 @@ public class CEPMigration11to13Test {
 
 		NullByteKeySelector keySelector = new NullByteKeySelector();
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
 				new KeyedOneInputStreamOperatorTestHarness<>(
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
@@ -204,11 +205,11 @@ public class CEPMigration11to13Test {
 		assertTrue(resultRecord.getValue() instanceof Map);
 
 		@SuppressWarnings("unchecked")
-		Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
+		Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue();
 
-		assertEquals(startEvent, patternMap.get("start"));
-		assertEquals(middleEvent, patternMap.get("middle"));
-		assertEquals(endEvent, patternMap.get("end"));
+		assertEquals(startEvent, patternMap.get("start").get(0));
+		assertEquals(middleEvent, patternMap.get("middle").get(0));
+		assertEquals(endEvent, patternMap.get("end").get(0));
 
 		harness.close();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 4048bc2..5ed8b46 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -46,7 +46,9 @@ import org.junit.rules.TemporaryFolder;
 
 import static org.junit.Assert.*;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 
@@ -58,7 +60,7 @@ public class CEPOperatorTest extends TestLogger {
 	@Test
 	public void testKeyedCEPOperatorWatermarkForwarding() throws Exception {
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = getCepTestHarness(false);
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(false);
 
 		harness.open();
 
@@ -74,7 +76,7 @@ public class CEPOperatorTest extends TestLogger {
 	@Test
 	public void testKeyedCEPOperatorCheckpointing() throws Exception {
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = getCepTestHarness(false);
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(false);
 
 		harness.open();
 
@@ -138,7 +140,7 @@ public class CEPOperatorTest extends TestLogger {
 		RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
 		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = getCepTestHarness(false);
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(false);
 
 		harness.setStateBackend(rocksDBStateBackend);
 
@@ -208,7 +210,6 @@ public class CEPOperatorTest extends TestLogger {
 	 * Tests that the internal time of a CEP operator advances only given watermarks. See FLINK-5033
 	 */
 	@Test
-	@SuppressWarnings("unchecked")
 	public void testKeyedAdvancingTimeWithoutElements() throws Exception {
 		final KeySelector<Event, Integer> keySelector = new TestKeySelector();
 
@@ -216,10 +217,10 @@ public class CEPOperatorTest extends TestLogger {
 		final long watermarkTimestamp1 = 5L;
 		final long watermarkTimestamp2 = 13L;
 
-		final Map<String, Event> expectedSequence = new HashMap<>(2);
-		expectedSequence.put("start", startEvent);
+		final Map<String, List<Event>> expectedSequence = new HashMap<>(2);
+		expectedSequence.put("start", Collections.<Event>singletonList(startEvent));
 
-		OneInputStreamOperatorTestHarness<Event, Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>(
+		OneInputStreamOperatorTestHarness<Event, Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>> harness = new KeyedOneInputStreamOperatorTestHarness<>(
 			new TimeoutKeyedCEPPatternOperator<>(
 				Event.createTypeSerializer(),
 				false,
@@ -234,7 +235,7 @@ public class CEPOperatorTest extends TestLogger {
 		try {
 			harness.setup(
 				new KryoSerializer<>(
-					(Class<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>>) (Object) Either.class,
+					(Class<Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>>) (Object) Either.class,
 					new ExecutionConfig()));
 			harness.open();
 
@@ -256,13 +257,15 @@ public class CEPOperatorTest extends TestLogger {
 
 			assertTrue(resultObject instanceof StreamRecord);
 
-			StreamRecord<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> streamRecord = (StreamRecord<Either<Tuple2<Map<String,Event>,Long>,Map<String,Event>>>) resultObject;
+			StreamRecord<Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>> streamRecord =
+					(StreamRecord<Either<Tuple2<Map<String,List<Event>>,Long>,Map<String,List<Event>>>>) resultObject;
 
 			assertTrue(streamRecord.getValue() instanceof Either.Left);
 
-			Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>> left = (Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>>) streamRecord.getValue();
+			Either.Left<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>> left =
+			(Either.Left<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>) streamRecord.getValue();
 
-			Tuple2<Map<String, Event>, Long> leftResult = left.left();
+			Tuple2<Map<String, List<Event>>, Long> leftResult = left.left();
 
 			assertEquals(watermarkTimestamp2, (long) leftResult.f1);
 			assertEquals(expectedSequence, leftResult.f0);
@@ -292,7 +295,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		TestKeySelector keySelector = new TestKeySelector();
 		KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(false, keySelector);
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = getCepTestHarness(operator);
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator);
 
 		harness.open();
 
@@ -380,7 +383,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		TestKeySelector keySelector = new TestKeySelector();
 		KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(true, keySelector);
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = getCepTestHarness(operator);
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator);
 
 		harness.open();
 
@@ -449,13 +452,13 @@ public class CEPOperatorTest extends TestLogger {
 		assertTrue(resultRecord.getValue() instanceof Map);
 
 		@SuppressWarnings("unchecked")
-		Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
-		assertEquals(start, patternMap.get("start"));
-		assertEquals(middle, patternMap.get("middle"));
-		assertEquals(end, patternMap.get("end"));
+		Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue();
+		assertEquals(start, patternMap.get("start").get(0));
+		assertEquals(middle, patternMap.get("middle").get(0));
+		assertEquals(end, patternMap.get("end").get(0));
 	}
 
-	private OneInputStreamOperatorTestHarness<Event, Map<String, Event>> getCepTestHarness(boolean isProcessingTime) throws Exception {
+	private OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> getCepTestHarness(boolean isProcessingTime) throws Exception {
 		KeySelector<Event, Integer> keySelector = new TestKeySelector();
 
 		return new KeyedOneInputStreamOperatorTestHarness<>(
@@ -464,7 +467,7 @@ public class CEPOperatorTest extends TestLogger {
 			BasicTypeInfo.INT_TYPE_INFO);
 	}
 
-	private OneInputStreamOperatorTestHarness<Event, Map<String, Event>> getCepTestHarness(
+	private OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> getCepTestHarness(
 			KeyedCEPPatternOperator<Event, Integer> cepOperator) throws Exception {
 		KeySelector<Event, Integer> keySelector = new TestKeySelector();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index a048183..0210ef9 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.junit.Test;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 
@@ -79,7 +80,7 @@ public class CEPRescalingTest {
 
 		// now we start the test, we go from parallelism 1 to 2.
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
 			getTestHarness(maxParallelism, 1, 0);
 		harness.open();
 
@@ -99,7 +100,7 @@ public class CEPRescalingTest {
 		// so we initialize the two tasks and we put the rest of
 		// the valid elements for the pattern on task 0.
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness1 =
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness1 =
 			getTestHarness(maxParallelism, 2, 0);
 
 		harness1.setup();
@@ -120,7 +121,7 @@ public class CEPRescalingTest {
 		verifyWatermark(harness1.getOutput().poll(), 2);
 		verifyPattern(harness1.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness2 =
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness2 =
 			getTestHarness(maxParallelism, 2, 1);
 
 		harness2.setup();
@@ -198,15 +199,15 @@ public class CEPRescalingTest {
 
 		// starting the test, we will go from parallelism of 3 to parallelism of 2
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness1 =
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness1 =
 			getTestHarness(maxParallelism, 3, 0);
 		harness1.open();
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness2 =
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness2 =
 			getTestHarness(maxParallelism, 3, 1);
 		harness2.open();
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness3 =
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness3 =
 			getTestHarness(maxParallelism, 3, 2);
 		harness3.open();
 
@@ -251,13 +252,13 @@ public class CEPRescalingTest {
 			harness3.snapshot(0, 0)
 		);
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness4 =
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness4 =
 			getTestHarness(maxParallelism, 2, 0);
 		harness4.setup();
 		harness4.initializeState(snapshot);
 		harness4.open();
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness5 =
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness5 =
 			getTestHarness(maxParallelism, 2, 1);
 		harness5.setup();
 		harness5.initializeState(snapshot);
@@ -295,8 +296,8 @@ public class CEPRescalingTest {
 		assertTrue(resultRecord.getValue() instanceof Map);
 
 		@SuppressWarnings("unchecked")
-		Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
-		if (patternMap.get("start").getId() == 7) {
+		Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue();
+		if (patternMap.get("start").get(0).getId() == 7) {
 			verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
 			verifyPattern(harness4.getOutput().poll(), startEvent3, middleEvent3, endEvent3);
 		} else {
@@ -327,13 +328,13 @@ public class CEPRescalingTest {
 		assertTrue(resultRecord.getValue() instanceof Map);
 
 		@SuppressWarnings("unchecked")
-		Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
-		assertEquals(start, patternMap.get("start"));
-		assertEquals(middle, patternMap.get("middle"));
-		assertEquals(end, patternMap.get("end"));
+		Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue();
+		assertEquals(start, patternMap.get("start").get(0));
+		assertEquals(middle, patternMap.get("middle").get(0));
+		assertEquals(end, patternMap.get("end").get(0));
 	}
 
-	private KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, Event>> getTestHarness(
+	private KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> getTestHarness(
 		int maxParallelism,
 		int taskParallelism,
 		int subtaskIdx) throws Exception {


Mime
View raw message