flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [05/50] [abbrv] flink git commit: [FLINK-3318] Add support for quantifiers to CEP's pattern API
Date Thu, 30 Mar 2017 22:04:16 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index ccae848..825ba957 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -46,9 +46,9 @@ public class NFAITCase extends TestLogger {
 	public void testSimplePatternNFA() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
-		Event startEvent = new Event(42, "start", 1.0);
+		Event startEvent = new Event(41, "start", 1.0);
 		SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
-		Event endEvent=  new Event(43, "end", 1.0);
+		Event endEvent = new Event(43, "end", 1.0);
 
 		inputEvents.add(new StreamRecord<Event>(startEvent, 1));
 		inputEvents.add(new StreamRecord<Event>(new Event(43, "foobar", 1.0), 2));
@@ -102,6 +102,99 @@ public class NFAITCase extends TestLogger {
 		assertEquals(endEvent, patternMap.get("end"));
 	}
 
+	@Test
+	public void testStrictContinuityWithResults() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event end = new Event(42, "b", 4.0);
+
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(end, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).next("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(1, allPatterns.size());
+		assertEquals(Sets.<Set<Event>>newHashSet(
+			Sets.newHashSet(middleEvent1, end)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testStrictContinuityNoResults() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "c", 3.0);
+		Event end = new Event(43, "b", 4.0);
+
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(end, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).next("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+			}
+		}
+
+		assertEquals(Sets.newHashSet(), resultingPatterns);
+	}
+
 	/**
 	 * Tests that the NFA successfully filters out expired elements with respect to the window
 	 * length
@@ -327,6 +420,1247 @@ public class NFAITCase extends TestLogger {
 		), patterns);
 	}
 
+	@Test
+	public void testComplexBranchingAfterKleeneStar() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+		Event end2 = new Event(45, "d", 6.0);
+		Event end3 = new Event(46, "d", 7.0);
+		Event end4 = new Event(47, "e", 8.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+		inputEvents.add(new StreamRecord<>(end2, 7));
+		inputEvents.add(new StreamRecord<>(end3, 8));
+		inputEvents.add(new StreamRecord<>(end4, 9));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		})
+			.followedBy("end2").where(new FilterFunction<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("d");
+				}
+			})
+			.followedBy("end3").where(new FilterFunction<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("e");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(16, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end2, end4),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1, end2, end4),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1, end2, end4),
+			Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end1, end2, end4),
+			Sets.newHashSet(startEvent, middleEvent1, end1, end2, end4),
+			Sets.newHashSet(startEvent, middleEvent2, end1, end2, end4),
+			Sets.newHashSet(startEvent, middleEvent3, end1, end2, end4),
+			Sets.newHashSet(startEvent, end1, end2, end4),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end3, end4),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1, end3, end4),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1, end3, end4),
+			Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end1, end3, end4),
+			Sets.newHashSet(startEvent, middleEvent1, end1, end3, end4),
+			Sets.newHashSet(startEvent, middleEvent2, end1, end3, end4),
+			Sets.newHashSet(startEvent, middleEvent3, end1, end3, end4),
+			Sets.newHashSet(startEvent, end1, end3, end4)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testKleeneStar() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(4, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
+			Sets.newHashSet(startEvent, middleEvent1, end1),
+			Sets.newHashSet(startEvent, middleEvent2, end1),
+			Sets.newHashSet(startEvent, end1)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testEagerKleeneStar() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore(true).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(4, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
+			Sets.newHashSet(startEvent, middleEvent1, end1),
+			Sets.newHashSet(startEvent, end1)
+		), resultingPatterns);
+	}
+
+
+	@Test
+	public void testBeginWithKleeneStar() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event middleEvent1 = new Event(40, "a", 2.0);
+		Event middleEvent2 = new Event(41, "a", 3.0);
+		Event middleEvent3 = new Event(41, "a", 3.0);
+		Event end = new Event(42, "b", 4.0);
+
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(end, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore().followedBy("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(7, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(middleEvent1, middleEvent2, middleEvent3, end),
+			Sets.newHashSet(middleEvent1, middleEvent2, end),
+			Sets.newHashSet(middleEvent2, middleEvent3, end),
+			Sets.newHashSet(middleEvent1, end),
+			Sets.newHashSet(middleEvent2, end),
+			Sets.newHashSet(middleEvent3, end),
+			Sets.newHashSet(end)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testKleeneStarAfterKleeneStar() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "d", 3.0);
+		Event middleEvent3 = new Event(43, "d", 4.0);
+		Event end = new Event(44, "e", 4.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(end, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle-first").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore(false).followedBy("middle-second").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		}).zeroOrMore(false).followedBy("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("e");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(8, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end),
+			Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end),
+			Sets.newHashSet(startEvent, middleEvent3, end),
+			Sets.newHashSet(startEvent, middleEvent2, end),
+			Sets.newHashSet(startEvent, middleEvent1, end),
+			Sets.newHashSet(startEvent, end)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testKleeneStarAfterBranching() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event merging = new Event(42, "f", 3.0);
+		Event kleene1 = new Event(43, "d", 4.0);
+		Event kleene2 = new Event(44, "d", 4.0);
+		Event end = new Event(45, "e", 4.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(merging, 5));
+		inputEvents.add(new StreamRecord<>(kleene1, 6));
+		inputEvents.add(new StreamRecord<>(kleene2, 7));
+		inputEvents.add(new StreamRecord<>(end, 8));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
 
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("branching").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).followedBy("merging").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("f");
+			}
+		}).followedBy("kleene").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		}).zeroOrMore(false).followedBy("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("e");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(8, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent, middleEvent1, merging, end),
+			Sets.newHashSet(startEvent, middleEvent1, merging, kleene1, end),
+			Sets.newHashSet(startEvent, middleEvent1, merging, kleene2, end),
+			Sets.newHashSet(startEvent, middleEvent1, merging, kleene1, kleene2, end),
+			Sets.newHashSet(startEvent, middleEvent2, merging, end),
+			Sets.newHashSet(startEvent, middleEvent2, merging, kleene1, end),
+			Sets.newHashSet(startEvent, middleEvent2, merging, kleene2, end),
+			Sets.newHashSet(startEvent, middleEvent2, merging, kleene1, kleene2, end)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testStrictContinuityNoResultsAfterKleeneStar() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event start = new Event(40, "d", 2.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 2.0);
+		Event middleEvent3 = new Event(43, "c", 3.0);
+		Event end = new Event(44, "b", 4.0);
+
+		inputEvents.add(new StreamRecord<>(start, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+		inputEvents.add(new StreamRecord<>(end, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore()
+			.next("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+			}
+		}
+
+		assertEquals(Sets.newHashSet(), resultingPatterns);
+	}
+
+	@Test
+	public void testStrictContinuityResultsAfterKleeneStar() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event start = new Event(40, "d", 2.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 2.0);
+		Event end = new Event(43, "b", 4.0);
+
+		inputEvents.add(new StreamRecord<>(start, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+		inputEvents.add(new StreamRecord<>(end, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore(false)
+			.next("end").where(new FilterFunction<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("b");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(2, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(start, middleEvent1, middleEvent2, end),
+			Sets.newHashSet(start, middleEvent2, end)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testAtLeastOne() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(3, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
+			Sets.newHashSet(startEvent, middleEvent1, end1),
+			Sets.newHashSet(startEvent, middleEvent2, end1)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testBeginWithAtLeastOne() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent1 = new Event(41, "a", 2.0);
+		Event startEvent2 = new Event(42, "a", 3.0);
+		Event startEvent3 = new Event(42, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent1, 3));
+		inputEvents.add(new StreamRecord<>(startEvent2, 4));
+		inputEvents.add(new StreamRecord<>(startEvent3, 5));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore(false).followedBy("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(7, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent1, startEvent2, startEvent3, end1),
+			Sets.newHashSet(startEvent1, startEvent2, end1),
+			Sets.newHashSet(startEvent1, startEvent3, end1),
+			Sets.newHashSet(startEvent2, startEvent3, end1),
+			Sets.newHashSet(startEvent1, end1),
+			Sets.newHashSet(startEvent2, end1),
+			Sets.newHashSet(startEvent3, end1)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testNextZeroOrMore() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "start", 1.0);
+		Event middleEvent1 = new Event(40, "middle", 2.0);
+		Event middleEvent2 = new Event(40, "middle", 3.0);
+		Event middleEvent3 = new Event(40, "middle", 4.0);
+		Event endEvent = new Event(46, "end", 1.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1L));
+		inputEvents.add(new StreamRecord<>(new Event(1, "event", 1.0), 2L));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3L));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4L));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5L));
+		inputEvents.add(new StreamRecord<>(endEvent, 6L));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).next("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("middle");
+			}
+		}).zeroOrMore(false).followedBy("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 7056763917392056548L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(1, allPatterns.size());
+		assertEquals(Sets.<Set<Event>>newHashSet(
+			Sets.newHashSet(startEvent, endEvent)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testAtLeastOneEager() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore(true).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(3, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
+			Sets.newHashSet(startEvent, middleEvent1, end1)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testOptional() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent, 5));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).optional().followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(2, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent, middleEvent, end1),
+			Sets.newHashSet(startEvent, end1)
+		), resultingPatterns);
+	}
+
+
+	@Test
+	public void testTimes() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).next("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(2).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(1, allPatterns.size());
+		assertEquals(Sets.<Set<Event>>newHashSet(
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testStartWithTimes() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(2).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(2, allPatterns.size());
+		assertEquals(Sets.<Set<Event>>newHashSet(
+			Sets.newHashSet(middleEvent1, middleEvent2, end1),
+			Sets.newHashSet(middleEvent2, middleEvent3, end1)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testStartWithOptional() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).optional().followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(2, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent,  end1),
+			Sets.newHashSet(end1)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testEndWithZeroOrMore() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore();
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(4, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent,  middleEvent1, middleEvent2, middleEvent3),
+			Sets.newHashSet(startEvent,  middleEvent1, middleEvent2),
+			Sets.newHashSet(startEvent,  middleEvent1),
+			Sets.newHashSet(startEvent)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testStartAndEndWithZeroOrMore() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "d", 5.0);
+		Event end2 = new Event(45, "d", 5.0);
+		Event end3 = new Event(46, "d", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+		inputEvents.add(new StreamRecord<>(end2, 6));
+		inputEvents.add(new StreamRecord<>(end3, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore();
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(6, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(middleEvent1,  middleEvent2, middleEvent3),
+			Sets.newHashSet(middleEvent1,  middleEvent2),
+			Sets.newHashSet(middleEvent1),
+			Sets.newHashSet(middleEvent2,  middleEvent3),
+			Sets.newHashSet(middleEvent2),
+			Sets.newHashSet(middleEvent3)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testEndWithOptional() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).optional();
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(2, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent,  middleEvent1),
+			Sets.newHashSet(startEvent)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testEndWithOneOrMore() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore();
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(3, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent,  middleEvent1, middleEvent2, middleEvent3),
+			Sets.newHashSet(startEvent,  middleEvent1, middleEvent2),
+			Sets.newHashSet(startEvent,  middleEvent1)
+		), resultingPatterns);
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 9f65132..40a0e7e 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -21,6 +21,7 @@ package org.apache.flink.cep.nfa;
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.cep.Event;
+import org.apache.flink.cep.pattern.FilterFunctions;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -51,12 +52,12 @@ public class NFATest extends TestLogger {
 		streamEvents.add(new StreamRecord<>(new Event(3, "start", 3.0), 3L));
 		streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 4L));
 
-		State<Event> startingState = new State<>("", State.StateType.Start);
-		State<Event> startState = new State<>("start", State.StateType.Normal);
-		State<Event> endState = new State<>("end", State.StateType.Final);
-		StateTransition<Event> starting2Start = new StateTransition<>(
-			StateTransitionAction.TAKE,
-			startState,
+		State<Event> startState = new State<>("start", State.StateType.Start);
+		State<Event> endState = new State<>("end", State.StateType.Normal);
+		State<Event> endingState = new State<>("", State.StateType.Final);
+
+		startState.addTake(
+			endState,
 			new FilterFunction<Event>() {
 				private static final long serialVersionUID = -4869589195918650396L;
 
@@ -64,12 +65,9 @@ public class NFATest extends TestLogger {
 				public boolean filter(Event value) throws Exception {
 					return value.getName().equals("start");
 				}
-			}
-		);
-
-		StateTransition<Event> start2End = new StateTransition<>(
-			StateTransitionAction.TAKE,
-			endState,
+			});
+		endState.addTake(
+			endingState,
 			new FilterFunction<Event>() {
 				private static final long serialVersionUID = 2979804163709590673L;
 
@@ -77,18 +75,12 @@ public class NFATest extends TestLogger {
 				public boolean filter(Event value) throws Exception {
 					return value.getName().equals("end");
 				}
-			}
-		);
-
-		StateTransition<Event> start2Start = new StateTransition<>(StateTransitionAction.IGNORE, startState, null);
-
-		startingState.addStateTransition(starting2Start);
-		startState.addStateTransition(start2End);
-		startState.addStateTransition(start2Start);
+			});
+		endState.addIgnore(FilterFunctions.<Event>trueFunction());
 
-		nfa.addState(startingState);
 		nfa.addState(startState);
 		nfa.addState(endState);
+		nfa.addState(endingState);
 
 		Set<Map<String, Event>> expectedPatterns = new HashSet<>();
 
@@ -196,8 +188,10 @@ public class NFATest extends TestLogger {
 	public <T> Collection<Map<String, T>> runNFA(NFA<T> nfa, List<StreamRecord<T>> inputs) {
 		Set<Map<String, T>> actualPatterns = new HashSet<>();
 
-		for (StreamRecord<T> streamEvent: inputs) {
-			Collection<Map<String, T>> matchedPatterns = nfa.process(streamEvent.getValue(), streamEvent.getTimestamp()).f0;
+		for (StreamRecord<T> streamEvent : inputs) {
+			Collection<Map<String, T>> matchedPatterns = nfa.process(
+				streamEvent.getValue(),
+				streamEvent.getTimestamp()).f0;
 
 			actualPatterns.addAll(matchedPatterns);
 		}
@@ -213,24 +207,12 @@ public class NFATest extends TestLogger {
 		State<Event> startState = new State<>("start", State.StateType.Normal);
 		State<Event> endState = new State<>("end", State.StateType.Final);
 
-		StateTransition<Event> starting2Start = new StateTransition<>(
-			StateTransitionAction.TAKE,
-			startState,
-			new NameFilter("start"));
 
-		StateTransition<Event> start2End = new StateTransition<>(
-			StateTransitionAction.TAKE,
-			endState,
+		startingState.addTake(
+			new NameFilter("start"));
+		startState.addTake(
 			new NameFilter("end"));
-
-		StateTransition<Event> start2Start = new StateTransition<>(
-			StateTransitionAction.IGNORE,
-			startState,
-			null);
-
-		startingState.addStateTransition(starting2Start);
-		startState.addStateTransition(start2End);
-		startState.addStateTransition(start2Start);
+		startState.addIgnore(null);
 
 		nfa.addState(startingState);
 		nfa.addState(startState);
@@ -253,12 +235,12 @@ public class NFATest extends TestLogger {
 	private NFA<Event> createStartEndNFA(long windowLength) {
 		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), windowLength, false);
 
-		State<Event> startingState = new State<>("", State.StateType.Start);
-		State<Event> startState = new State<>("start", State.StateType.Normal);
-		State<Event> endState = new State<>("end", State.StateType.Final);
-		StateTransition<Event> starting2Start = new StateTransition<>(
-			StateTransitionAction.TAKE,
-			startState,
+		State<Event> startState = new State<>("start", State.StateType.Start);
+		State<Event> endState = new State<>("end", State.StateType.Normal);
+		State<Event> endingState = new State<>("", State.StateType.Final);
+
+		startState.addTake(
+			endState,
 			new FilterFunction<Event>() {
 				private static final long serialVersionUID = -4869589195918650396L;
 
@@ -267,10 +249,8 @@ public class NFATest extends TestLogger {
 					return value.getName().equals("start");
 				}
 			});
-
-		StateTransition<Event> start2End = new StateTransition<>(
-			StateTransitionAction.TAKE,
-			endState,
+		endState.addTake(
+			endingState,
 			new FilterFunction<Event>() {
 				private static final long serialVersionUID = 2979804163709590673L;
 
@@ -279,19 +259,11 @@ public class NFATest extends TestLogger {
 					return value.getName().equals("end");
 				}
 			});
+		endState.addIgnore(FilterFunctions.<Event>trueFunction());
 
-		StateTransition<Event> start2Start = new StateTransition<>(
-			StateTransitionAction.IGNORE,
-			startState,
-			null);
-
-		startingState.addStateTransition(starting2Start);
-		startState.addStateTransition(start2End);
-		startState.addStateTransition(start2Start);
-
-		nfa.addState(startingState);
 		nfa.addState(startState);
 		nfa.addState(endState);
+		nfa.addState(endingState);
 
 		return nfa;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index d11f3a8..93d78cc 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.cep.nfa.compiler;
 
+import com.google.common.collect.Sets;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
@@ -28,22 +30,45 @@ import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
 import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.pattern.MalformedPatternException;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import static org.junit.Assert.assertTrue;
+import static com.google.common.collect.Sets.newHashSet;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class NFACompilerTest extends TestLogger {
 
+	private static final FilterFunction<Event> startFilter = new FilterFunction<Event>() {
+		private static final long serialVersionUID = 3314714776170474221L;
+
+		@Override
+		public boolean filter(Event value) throws Exception {
+			return value.getPrice() > 2;
+		}
+	};
+
+	private static final FilterFunction<Event> endFilter = new FilterFunction<Event>() {
+		private static final long serialVersionUID = 3990995859716364087L;
+
+		@Override
+		public boolean filter(Event value) throws Exception {
+			return value.getName().equals("end");
+		}
+	};
+
+	private static final TypeSerializer<Event> serializer = TypeExtractor.createTypeInfo(Event.class)
+		.createSerializer(new ExecutionConfig());
+
 	@Rule
 	public ExpectedException expectedException = ExpectedException.none();
 
@@ -81,83 +106,96 @@ public class NFACompilerTest extends TestLogger {
 	 */
 	@Test
 	public void testNFACompilerWithSimplePattern() {
-		Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
-			private static final long serialVersionUID = 3314714776170474221L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getPrice() > 2;
-			}
-		})
-		.followedBy("middle").subtype(SubEvent.class)
-		.next("end").where(new FilterFunction<Event>() {
-				private static final long serialVersionUID = 3990995859716364087L;
+		Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(startFilter)
+			.followedBy("middle").subtype(SubEvent.class)
+			.next("end").where(endFilter);
 
-				@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("end");
-			}
-		});
-
-		TypeInformation<Event> typeInformation = TypeExtractor.createTypeInfo(Event.class);
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, typeInformation.createSerializer(new ExecutionConfig()), false);
+		NFA<Event> nfa = NFACompiler.compile(pattern, serializer, false);
 
 		Set<State<Event>> states = nfa.getStates();
-
 		assertEquals(4, states.size());
 
 		Map<String, State<Event>> stateMap = new HashMap<>();
-
-		for (State<Event> state: states) {
+		for (State<Event> state : states) {
 			stateMap.put(state.getName(), state);
 		}
 
-		assertTrue(stateMap.containsKey(NFACompiler.BEGINNING_STATE_NAME));
-		State<Event> beginningState = stateMap.get(NFACompiler.BEGINNING_STATE_NAME);
-
-		assertTrue(beginningState.isStart());
-
 		assertTrue(stateMap.containsKey("start"));
 		State<Event> startState = stateMap.get("start");
+		assertTrue(startState.isStart());
+		final Set<Tuple2<String, StateTransitionAction>> startTransitions = unfoldTransitions(startState);
+		assertEquals(newHashSet(
+			Tuple2.of("middle", StateTransitionAction.TAKE)
+		), startTransitions);
 
-		Collection<StateTransition<Event>> startTransitions = startState.getStateTransitions();
-		Map<String, StateTransition<Event>> startTransitionMap = new HashMap<>();
+		assertTrue(stateMap.containsKey("middle"));
+		State<Event> middleState = stateMap.get("middle");
+		final Set<Tuple2<String, StateTransitionAction>> middleTransitions = unfoldTransitions(middleState);
+		assertEquals(newHashSet(
+			Tuple2.of("middle", StateTransitionAction.IGNORE),
+			Tuple2.of("end", StateTransitionAction.TAKE)
+		), middleTransitions);
 
-		for (StateTransition<Event> transition: startTransitions) {
-			startTransitionMap.put(transition.getTargetState().getName(), transition);
-		}
+		assertTrue(stateMap.containsKey("end"));
+		State<Event> endState = stateMap.get("end");
+		final Set<Tuple2<String, StateTransitionAction>> endTransitions = unfoldTransitions(endState);
+		assertEquals(newHashSet(
+			Tuple2.of(NFACompiler.ENDING_STATE_NAME, StateTransitionAction.TAKE)
+		), endTransitions);
+
+		assertTrue(stateMap.containsKey(NFACompiler.ENDING_STATE_NAME));
+		State<Event> endingState = stateMap.get(NFACompiler.ENDING_STATE_NAME);
+		assertTrue(endingState.isFinal());
+		assertEquals(0, endingState.getStateTransitions().size());
+	}
 
-		assertEquals(2, startTransitionMap.size());
-		assertTrue(startTransitionMap.containsKey("start"));
+	@Test
+	public void testNFACompilerWithKleeneStar() {
 
-		StateTransition<Event> reflexiveTransition = startTransitionMap.get("start");
-		assertEquals(StateTransitionAction.IGNORE, reflexiveTransition.getAction());
+		Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(startFilter)
+			.followedBy("middle").subtype(SubEvent.class).zeroOrMore()
+			.followedBy("end").where(endFilter);
 
-		assertTrue(startTransitionMap.containsKey("middle"));
-		StateTransition<Event> startMiddleTransition = startTransitionMap.get("middle");
-		assertEquals(StateTransitionAction.TAKE, startMiddleTransition.getAction());
+		NFA<Event> nfa = NFACompiler.compile(pattern, serializer, false);
 
-		assertTrue(stateMap.containsKey("middle"));
-		State<Event> middleState = stateMap.get("middle");
+		Set<State<Event>> states = nfa.getStates();
+		assertEquals(5, states.size());
 
-		Map<String, StateTransition<Event>> middleTransitionMap = new HashMap<>();
 
-		for (StateTransition<Event> transition: middleState.getStateTransitions()) {
-			middleTransitionMap.put(transition.getTargetState().getName(), transition);
+		Set<Tuple2<String, Set<Tuple2<String, StateTransitionAction>>>> stateMap = new HashSet<>();
+		for (State<Event> state : states) {
+			stateMap.add(Tuple2.of(state.getName(), unfoldTransitions(state)));
 		}
 
-		assertEquals(1, middleTransitionMap.size());
+		assertEquals(stateMap, newHashSet(
+			Tuple2.of("start", newHashSet(Tuple2.of("middle", StateTransitionAction.TAKE))),
+			Tuple2.of("middle", newHashSet(
+				Tuple2.of("middle", StateTransitionAction.IGNORE),
+				Tuple2.of("middle", StateTransitionAction.TAKE)
+			)),
+		    Tuple2.of("middle", newHashSet(
+			    Tuple2.of("middle", StateTransitionAction.IGNORE),
+			    Tuple2.of("middle", StateTransitionAction.TAKE),
+			    Tuple2.of("end", StateTransitionAction.PROCEED)
+		    )),
+			Tuple2.of("end", newHashSet(
+				Tuple2.of(NFACompiler.ENDING_STATE_NAME, StateTransitionAction.TAKE),
+				Tuple2.of("end", StateTransitionAction.IGNORE)
+			)),
+		    Tuple2.of(NFACompiler.ENDING_STATE_NAME, Sets.newHashSet())
+		));
 
-		assertTrue(middleTransitionMap.containsKey("end"));
-		StateTransition<Event> middleEndTransition = middleTransitionMap.get("end");
+	}
 
-		assertEquals(StateTransitionAction.TAKE, middleEndTransition.getAction());
 
-		assertTrue(stateMap.containsKey("end"));
-		State<Event> endState = stateMap.get("end");
-
-		assertTrue(endState.isFinal());
-		assertEquals(0, endState.getStateTransitions().size());
+	private <T> Set<Tuple2<String, StateTransitionAction>> unfoldTransitions(final State<T> state) {
+		final Set<Tuple2<String, StateTransitionAction>> transitions = new HashSet<>();
+		for (StateTransition<T> transition : state.getStateTransitions()) {
+			transitions.add(Tuple2.of(
+				transition.getTargetState().getName(),
+				transition.getAction()));
+		}
+		return transitions;
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index 98c3f5a..68b0419 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -184,4 +184,58 @@ public class PatternTest extends TestLogger {
 		assertEquals(previous2.getName(), "start");
 	}
 
+	@Test(expected = MalformedPatternException.class)
+	public void testPatternCanHaveQuantifierSpecifiedOnce1() throws Exception {
+
+		Pattern.begin("start").where(new FilterFunction<Object>() {
+			@Override
+			public boolean filter(Object value) throws Exception {
+				return true;
+			}
+		}).oneOrMore().zeroOrMore();
+	}
+
+	@Test(expected = MalformedPatternException.class)
+	public void testPatternCanHaveQuantifierSpecifiedOnce2() throws Exception {
+
+		Pattern.begin("start").where(new FilterFunction<Object>() {
+			@Override
+			public boolean filter(Object value) throws Exception {
+				return true;
+			}
+		}).zeroOrMore().times(1);
+	}
+
+	@Test(expected = MalformedPatternException.class)
+	public void testPatternCanHaveQuantifierSpecifiedOnce3() throws Exception {
+
+		Pattern.begin("start").where(new FilterFunction<Object>() {
+			@Override
+			public boolean filter(Object value) throws Exception {
+				return true;
+			}
+		}).times(1).oneOrMore();
+	}
+
+	@Test(expected = MalformedPatternException.class)
+	public void testPatternCanHaveQuantifierSpecifiedOnce4() throws Exception {
+
+		Pattern.begin("start").where(new FilterFunction<Object>() {
+			@Override
+			public boolean filter(Object value) throws Exception {
+				return true;
+			}
+		}).oneOrMore().oneOrMore(true);
+	}
+
+	@Test(expected = MalformedPatternException.class)
+	public void testPatternCanHaveQuantifierSpecifiedOnce5() throws Exception {
+
+		Pattern.begin("start").where(new FilterFunction<Object>() {
+			@Override
+			public boolean filter(Object value) throws Exception {
+				return true;
+			}
+		}).oneOrMore(true).zeroOrMore(true);
+	}
 }


Mime
View raw message