flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [01/50] [abbrv] flink git commit: [FLINK-6208] [cep] Implement skip till next match strategy [Forced Update!]
Date Wed, 03 May 2017 12:10:08 GMT
Repository: flink
Updated Branches:
  refs/heads/table-retraction b237a3ef0 -> e265620d4 (forced update)


http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index f4bf647..4a00c1e 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.Quantifier;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -46,6 +47,7 @@ import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 
+@SuppressWarnings("unchecked")
 public class NFAITCase extends TestLogger {
 
 	@Test
@@ -56,12 +58,12 @@ public class NFAITCase extends TestLogger {
 		SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
 		Event endEvent = new Event(43, "end", 1.0);
 
-		inputEvents.add(new StreamRecord<Event>(startEvent, 1));
-		inputEvents.add(new StreamRecord<Event>(new Event(43, "foobar", 1.0), 2));
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(43, "foobar", 1.0), 2));
 		inputEvents.add(new StreamRecord<Event>(new SubEvent(41, "barfoo", 1.0, 5.0), 3));
 		inputEvents.add(new StreamRecord<Event>(middleEvent, 3));
-		inputEvents.add(new StreamRecord<Event>(new Event(43, "start", 1.0), 4));
-		inputEvents.add(new StreamRecord<Event>(endEvent, 5));
+		inputEvents.add(new StreamRecord<>(new Event(43, "start", 1.0), 4));
+		inputEvents.add(new StreamRecord<>(endEvent, 5));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
@@ -212,12 +214,12 @@ public class NFAITCase extends TestLogger {
 		final Event middleEvent;
 		final Event endEvent;
 
-		events.add(new StreamRecord<Event>(new Event(1, "start", 1.0), 1));
-		events.add(new StreamRecord<Event>(startEvent = new Event(2, "start", 1.0), 2));
-		events.add(new StreamRecord<Event>(middleEvent = new Event(3, "middle", 1.0), 3));
-		events.add(new StreamRecord<Event>(new Event(4, "foobar", 1.0), 4));
-		events.add(new StreamRecord<Event>(endEvent = new Event(5, "end", 1.0), 11));
-		events.add(new StreamRecord<Event>(new Event(6, "end", 1.0), 13));
+		events.add(new StreamRecord<>(new Event(1, "start", 1.0), 1));
+		events.add(new StreamRecord<>(startEvent = new Event(2, "start", 1.0), 2));
+		events.add(new StreamRecord<>(middleEvent = new Event(3, "middle", 1.0), 3));
+		events.add(new StreamRecord<>(new Event(4, "foobar", 1.0), 4));
+		events.add(new StreamRecord<>(endEvent = new Event(5, "end", 1.0), 11));
+		events.add(new StreamRecord<>(new Event(6, "end", 1.0), 13));
 
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@@ -273,12 +275,12 @@ public class NFAITCase extends TestLogger {
 		Set<Tuple2<Map<String, Event>, Long>> resultingTimeoutPatterns = new HashSet<>();
 		Set<Tuple2<Map<String, Event>, Long>> expectedTimeoutPatterns = new HashSet<>();
 
-		events.add(new StreamRecord<Event>(new Event(1, "start", 1.0), 1));
-		events.add(new StreamRecord<Event>(new Event(2, "start", 1.0), 2));
-		events.add(new StreamRecord<Event>(new Event(3, "middle", 1.0), 3));
-		events.add(new StreamRecord<Event>(new Event(4, "foobar", 1.0), 4));
-		events.add(new StreamRecord<Event>(new Event(5, "end", 1.0), 11));
-		events.add(new StreamRecord<Event>(new Event(6, "end", 1.0), 13));
+		events.add(new StreamRecord<>(new Event(1, "start", 1.0), 1));
+		events.add(new StreamRecord<>(new Event(2, "start", 1.0), 2));
+		events.add(new StreamRecord<>(new Event(3, "middle", 1.0), 3));
+		events.add(new StreamRecord<>(new Event(4, "foobar", 1.0), 4));
+		events.add(new StreamRecord<>(new Event(5, "end", 1.0), 11));
+		events.add(new StreamRecord<>(new Event(6, "end", 1.0), 13));
 
 		Map<String, Event> timeoutPattern1 = new HashMap<>();
 		timeoutPattern1.put("start", new Event(1, "start", 1.0));
@@ -306,14 +308,14 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("start");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = -3268741540234334074L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("middle");
 			}
-		}).followedBy("end").where(new SimpleCondition<Event>() {
+		}).followedByAny("end").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = -8995174172182138608L;
 
 			@Override
@@ -353,13 +355,13 @@ public class NFAITCase extends TestLogger {
 		SubEvent nextOne2 = new SubEvent(45, "next-one", 1.0, 2.0);
 		Event endEvent=  new Event(46, "end", 1.0);
 
-		inputEvents.add(new StreamRecord<Event>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
 		inputEvents.add(new StreamRecord<Event>(middleEvent1, 3));
 		inputEvents.add(new StreamRecord<Event>(middleEvent2, 4));
 		inputEvents.add(new StreamRecord<Event>(middleEvent3, 5));
 		inputEvents.add(new StreamRecord<Event>(nextOne1, 6));
 		inputEvents.add(new StreamRecord<Event>(nextOne2, 7));
-		inputEvents.add(new StreamRecord<Event>(endEvent, 8));
+		inputEvents.add(new StreamRecord<>(endEvent, 8));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
@@ -368,21 +370,21 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("start");
 			}
-		}).followedBy("middle-first").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+		}).followedByAny("middle-first").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
 			private static final long serialVersionUID = 6215754202506583964L;
 
 			@Override
 			public boolean filter(SubEvent value) throws Exception {
 				return value.getVolume() > 5.0;
 			}
-		}).followedBy("middle-second").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+		}).followedByAny("middle-second").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
 			private static final long serialVersionUID = 6215754202506583964L;
 
 			@Override
 			public boolean filter(SubEvent value) throws Exception {
 				return value.getName().equals("next-one");
 			}
-		}).followedBy("end").where(new SimpleCondition<Event>() {
+		}).followedByAny("end").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 7056763917392056548L;
 
 			@Override
@@ -449,28 +451,28 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).oneOrMore().allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+		}).oneOrMore().allowCombinations().optional().followedByAny("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("b");
 			}
-		}).followedBy("end2").where(new SimpleCondition<Event>() {
+		}).followedByAny("end2").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("d");
 			}
-		}).followedBy("end3").where(new SimpleCondition<Event>() {
+		}).followedByAny("end3").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -537,7 +539,7 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -555,27 +557,14 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(4, allPatterns.size());
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
-			Sets.newHashSet(startEvent, middleEvent1, end1),
-			Sets.newHashSet(startEvent, middleEvent2, end1),
-			Sets.newHashSet(startEvent, end1)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent1, end1),
+			Lists.newArrayList(startEvent, middleEvent2, end1),
+			Lists.newArrayList(startEvent, end1)
+		));
 	}
 
 	@Test
@@ -724,14 +713,14 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle-first").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle-first").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).oneOrMore().allowCombinations().optional().followedBy("middle-second").where(new SimpleCondition<Event>() {
+		}).oneOrMore().allowCombinations().optional().followedByAny("middle-second").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -803,21 +792,21 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("branching").where(new SimpleCondition<Event>() {
+		}).followedByAny("branching").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).followedBy("merging").where(new SimpleCondition<Event>() {
+		}).followedByAny("merging").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("f");
 			}
-		}).followedBy("kleene").where(new SimpleCondition<Event>() {
+		}).followedByAny("kleene").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -940,7 +929,7 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("d");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -958,25 +947,12 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(2, allPatterns.size());
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(start, middleEvent1, middleEvent2, end),
-			Sets.newHashSet(start, middleEvent2, end)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(start, middleEvent1, middleEvent2, end),
+			Lists.newArrayList(start, middleEvent2, end)
+		));
 	}
 
 	@Test
@@ -1000,14 +976,14 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).oneOrMore().allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+		}).oneOrMore().allowCombinations().followedByAny("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1183,14 +1159,14 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).oneOrMore().followedBy("end1").where(new SimpleCondition<Event>() {
+		}).oneOrMore().followedByAny("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1201,26 +1177,16 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
-
-		assertEquals(3, allPatterns.size());
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
-			Sets.newHashSet(startEvent, middleEvent1, end1)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent1, end1)
+		));
 	}
 
 	@Test
@@ -1322,25 +1288,12 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
-
-		assertEquals(2, allPatterns.size());
-		assertEquals(Sets.<Set<Event>>newHashSet(
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1)
+		));
 	}
 
 	@Test
@@ -1375,25 +1328,13 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(middleEvent1, middleEvent2, end1),
+			Lists.newArrayList(middleEvent2, middleEvent3, end1)
+		));
 
-		assertEquals(2, allPatterns.size());
-		assertEquals(Sets.<Set<Event>>newHashSet(
-			Sets.newHashSet(middleEvent1, middleEvent2, end1),
-			Sets.newHashSet(middleEvent2, middleEvent3, end1)
-		), resultingPatterns);
 	}
 
 	@Test
@@ -1442,49 +1383,6 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesStrictWithFollowedBy() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
-		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
-		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3));
-		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
-		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
-		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).times(2).allowCombinations().consecutive().followedBy("end1").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent2, ConsecutiveData.end)
-		));
-	}
-
-	@Test
 	public void testTimesNotStrictWithFollowedByEager() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
@@ -1543,7 +1441,7 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1571,49 +1469,6 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesStrictWithFollowedByNotEager() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
-		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
-		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
-		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
-		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).times(2).allowCombinations().consecutive().followedBy("end1").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-				Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
-				Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end)
-		));
-	}
-
-	@Test
 	public void testTimesStrictWithNextAndConsecutive() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
@@ -1789,29 +1644,16 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
-
-		assertEquals(6, allPatterns.size());
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(middleEvent1,  middleEvent2, middleEvent3),
-			Sets.newHashSet(middleEvent1,  middleEvent2),
-			Sets.newHashSet(middleEvent1),
-			Sets.newHashSet(middleEvent2,  middleEvent3),
-			Sets.newHashSet(middleEvent2),
-			Sets.newHashSet(middleEvent3)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(middleEvent1,  middleEvent2, middleEvent3),
+			Lists.newArrayList(middleEvent1,  middleEvent2),
+			Lists.newArrayList(middleEvent1),
+			Lists.newArrayList(middleEvent2,  middleEvent3),
+			Lists.newArrayList(middleEvent2),
+			Lists.newArrayList(middleEvent3)
+		));
 	}
 
 	@Test
@@ -1895,26 +1737,13 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		Set<Set<Event>> resultingPatterns = new HashSet<>();
-		List<Collection<Event>> allPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> foundPattern : patterns) {
-				resultingPatterns.add(new HashSet<>(foundPattern.values()));
-				allPatterns.add(foundPattern.values());
-			}
-		}
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-		assertEquals(3, allPatterns.size());
-		assertEquals(Sets.newHashSet(
-			Sets.newHashSet(startEvent,  middleEvent1, middleEvent2, middleEvent3),
-			Sets.newHashSet(startEvent,  middleEvent1, middleEvent2),
-			Sets.newHashSet(startEvent,  middleEvent1)
-		), resultingPatterns);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent,  middleEvent1, middleEvent2, middleEvent3),
+			Lists.newArrayList(startEvent,  middleEvent1, middleEvent2),
+			Lists.newArrayList(startEvent,  middleEvent1)
+		));
 	}
 
 	///////////////////////////////         Optional           ////////////////////////////////////////
@@ -1978,7 +1807,7 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -2025,7 +1854,7 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -2071,7 +1900,7 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -2098,6 +1927,54 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
+	public void testOneOrMoreStrictOptional() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+				Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end),
+				Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+				Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+				Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+		));
+	}
+
+	@Test
 	public void testTimesStrictOptional1() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
@@ -2201,94 +2078,92 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testStrictCombinationsOneOrMore() {
-		List<List<Event>> resultingPatterns = testStrictOneOrMore(false);
+	public void testStrictOneOrMore() {
+		List<List<Event>> resultingPatterns = testOneOrMore(Quantifier.ConsumingStrategy.STRICT);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
 			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
-			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end),
-			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
-			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end),
-			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end),
-			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent4, ConsecutiveData.end)
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testSkipTillNextOneOrMore() {
+		List<List<Event>> resultingPatterns = testOneOrMore(Quantifier.ConsumingStrategy.SKIP_TILL_NEXT);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent4, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end)
 		));
 	}
 
 	@Test
-	public void testStrictEagerOneOrMore() {
-		List<List<Event>> resultingPatterns = testStrictOneOrMore(true);
+	public void testSkipTillAnyOneOrMore() {
+		List<List<Event>> resultingPatterns = testOneOrMore(Quantifier.ConsumingStrategy.SKIP_TILL_ANY);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent4, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent4, ConsecutiveData.end),
 			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent4, ConsecutiveData.end),
 			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent4, ConsecutiveData.end),
 			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end)
 		));
 	}
 
-	private List<List<Event>> testStrictOneOrMore(boolean eager) {
+	private List<List<Event>> testOneOrMore(Quantifier.ConsumingStrategy strategy) {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
 		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 2));
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
-		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 5));
-		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 6));
-		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent4, 7));
-		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 8));
-
-		Pattern<Event, ?> pattern;
-		if (eager) {
-			pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("c");
-				}
-			}).followedBy("middle").where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("a");
-				}
-			}).oneOrMore().consecutive()
-					.followedBy("end1").where(new SimpleCondition<Event>() {
-						private static final long serialVersionUID = 5726188262756267490L;
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 4));
+		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent4, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
 
-						@Override
-						public boolean filter(Event value) throws Exception {
-							return value.getName().equals("b");
-						}
-					});
-		} else {
-			pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
 
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("c");
-				}
-			}).followedBy("middle").where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
 
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("a");
-				}
-			}).oneOrMore().allowCombinations().consecutive()
-					.followedBy("end1").where(new SimpleCondition<Event>() {
-						private static final long serialVersionUID = 5726188262756267490L;
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore();
 
-						@Override
-						public boolean filter(Event value) throws Exception {
-							return value.getName().equals("b");
-						}
-					});
+		switch (strategy) {
+			case STRICT:
+				pattern = pattern.consecutive();
+				break;
+			case SKIP_TILL_NEXT:
+				break;
+			case SKIP_TILL_ANY:
+				pattern = pattern.allowCombinations();
+				break;
 		}
 
+		pattern = pattern.followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
 		return feedNFA(inputEvents, nfa);
@@ -2296,9 +2171,10 @@ public class NFAITCase extends TestLogger {
 
 	@Test
 	public void testStrictEagerZeroOrMore() {
-		List<List<Event>> resultingPatterns = testStrictZeroOrMore(true);
+		List<List<Event>> resultingPatterns = testZeroOrMore(Quantifier.ConsumingStrategy.STRICT);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
 			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
 			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end),
 			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
@@ -2306,67 +2182,75 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testStrictCombinationsZeroOrMore() {
-		List<List<Event>> resultingPatterns = testStrictZeroOrMore(false);
+	public void testSkipTillAnyZeroOrMore() {
+		List<List<Event>> resultingPatterns = testZeroOrMore(Quantifier.ConsumingStrategy.SKIP_TILL_ANY);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent4, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent4, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent4, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent4, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
 			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end),
 			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end),
-			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end),
-			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end),
 			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
 		));
 	}
 
-	private List<List<Event>> testStrictZeroOrMore(boolean eager) {
+	@Test
+	public void testSkipTillNextZeroOrMore() {
+		List<List<Event>> resultingPatterns = testZeroOrMore(Quantifier.ConsumingStrategy.SKIP_TILL_NEXT);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent4, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+		));
+	}
+
+	private List<List<Event>> testZeroOrMore(Quantifier.ConsumingStrategy strategy) {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
 		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 2));
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 4));
 		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
-		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent4, 6));
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
 
-		Pattern<Event, ?> pattern = eager
-				? Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-					private static final long serialVersionUID = 5726188262756267490L;
-
-					@Override
-					public boolean filter(Event value) throws Exception {
-						return value.getName().equals("c");
-					}
-				}).followedBy("middle").where(new SimpleCondition<Event>() {
-					private static final long serialVersionUID = 5726188262756267490L;
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
 
-					@Override
-					public boolean filter(Event value) throws Exception {
-						return value.getName().equals("a");
-					}
-				}).oneOrMore().optional().consecutive().followedBy("end1").where(new SimpleCondition<Event>() {
-					private static final long serialVersionUID = 5726188262756267490L;
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
 
-					@Override
-					public boolean filter(Event value) throws Exception {
-						return value.getName().equals("b");
-					}
-				})
-				: Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-					private static final long serialVersionUID = 5726188262756267490L;
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().optional();
 
-					@Override
-					public boolean filter(Event value) throws Exception {
-						return value.getName().equals("c");
-					}
-				}).followedBy("middle").where(new SimpleCondition<Event>() {
-					private static final long serialVersionUID = 5726188262756267490L;
+		switch (strategy) {
+			case STRICT:
+				pattern = pattern.consecutive();
+				break;
+			case SKIP_TILL_NEXT:
+				break;
+			case SKIP_TILL_ANY:
+				pattern = pattern.allowCombinations();
+				break;
+		}
 
-					@Override
-					public boolean filter(Event value) throws Exception {
-						return value.getName().equals("a");
-					}
-				}).oneOrMore().allowCombinations().optional().consecutive().followedBy("end1").where(new SimpleCondition<Event>() {
+		pattern = pattern.followedBy("end1").where(new SimpleCondition<Event>() {
 					private static final long serialVersionUID = 5726188262756267490L;
 
 					@Override
@@ -2380,7 +2264,6 @@ public class NFAITCase extends TestLogger {
 		return feedNFA(inputEvents, nfa);
 	}
 
-
 	@Test
 	public void testTimesStrict() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
@@ -2400,14 +2283,14 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).times(2).allowCombinations().consecutive().followedBy("end1").where(new SimpleCondition<Event>() {
+		}).times(2).consecutive().followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -2444,7 +2327,7 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -2733,16 +2616,10 @@ public class NFAITCase extends TestLogger {
 				Lists.<List<Event>>newArrayList(
 						Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4),
 						Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
-						Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent3),
-						Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent2),
 						Lists.newArrayList(startEvent1, endEvent, middleEvent3, middleEvent1),
 						Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
 						Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent1),
-						Lists.newArrayList(startEvent1, endEvent, middleEvent4),
 						Lists.newArrayList(startEvent1, endEvent, middleEvent1),
-						Lists.newArrayList(startEvent1, endEvent, middleEvent2),
-						Lists.newArrayList(startEvent1, endEvent, middleEvent3),
-						Lists.newArrayList(startEvent2, endEvent, middleEvent4),
 						Lists.newArrayList(startEvent2, endEvent, middleEvent3)
 				)
 		);
@@ -2798,9 +2675,7 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-		return resultingPatterns;
+		return feedNFA(inputEvents, nfa);
 	}
 
 	private static class MySubeventIterCondition extends IterativeCondition<SubEvent> {
@@ -2887,9 +2762,7 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-		return resultingPatterns;
+		return feedNFA(inputEvents, nfa);
 	}
 
 	private static class MyEventIterCondition extends IterativeCondition<Event> {
@@ -2978,7 +2851,7 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("start");
 			}
-		}).followedBy("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+		}).followedByAny("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
 			private static final long serialVersionUID = 2178338526904474690L;
 
 			@Override
@@ -3043,14 +2916,14 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("start");
 			}
-		}).oneOrMore().followedBy("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+		}).oneOrMore().followedByAny("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
 			private static final long serialVersionUID = 2178338526904474690L;
 
 			@Override
 			public boolean filter(SubEvent value) throws Exception {
 				return value.getName().startsWith("foo");
 			}
-		}).followedBy("end").where(new IterativeCondition<Event>() {
+		}).followedByAny("end").where(new IterativeCondition<Event>() {
 			private static final long serialVersionUID = 7056763917392056548L;
 
 			@Override
@@ -3084,6 +2957,131 @@ public class NFAITCase extends TestLogger {
 		);
 	}
 
+
+	///////////////////////////////////////   Skip till next     /////////////////////////////
+
+	@Test
+	public void testBranchingPatternSkipTillNext() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "start", 1.0);
+		SubEvent middleEvent1 = new SubEvent(41, "foo1", 1.0, 10.0);
+		SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0);
+		SubEvent middleEvent3 = new SubEvent(43, "foo3", 1.0, 10.0);
+		SubEvent nextOne1 = new SubEvent(44, "next-one", 1.0, 2.0);
+		SubEvent nextOne2 = new SubEvent(45, "next-one", 1.0, 2.0);
+		Event endEvent=  new Event(46, "end", 1.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<Event>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<Event>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<Event>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<Event>(nextOne1, 6));
+		inputEvents.add(new StreamRecord<Event>(nextOne2, 7));
+		inputEvents.add(new StreamRecord<>(endEvent, 8));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).followedBy("middle-first").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter(SubEvent value) throws Exception {
+				return value.getVolume() > 5.0;
+			}
+		}).followedBy("middle-second").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter(SubEvent value) throws Exception {
+				return value.getName().equals("next-one");
+			}
+		}).followedByAny("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 7056763917392056548L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> patterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(patterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, nextOne1, endEvent)
+		));
+	}
+
+	@Test
+	public void testBranchingPatternMixedFollowedBy() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "start", 1.0);
+		SubEvent middleEvent1 = new SubEvent(41, "foo1", 1.0, 10.0);
+		SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0);
+		SubEvent middleEvent3 = new SubEvent(43, "foo3", 1.0, 10.0);
+		SubEvent nextOne1 = new SubEvent(44, "next-one", 1.0, 2.0);
+		SubEvent nextOne2 = new SubEvent(45, "next-one", 1.0, 2.0);
+		Event endEvent=  new Event(46, "end", 1.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<Event>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<Event>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<Event>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<Event>(nextOne1, 6));
+		inputEvents.add(new StreamRecord<Event>(nextOne2, 7));
+		inputEvents.add(new StreamRecord<>(endEvent, 8));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).followedByAny("middle-first").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter(SubEvent value) throws Exception {
+				return value.getVolume() > 5.0;
+			}
+		}).followedBy("middle-second").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter(SubEvent value) throws Exception {
+				return value.getName().equals("next-one");
+			}
+		}).followedByAny("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 7056763917392056548L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> patterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(patterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, nextOne1, endEvent),
+			Lists.newArrayList(startEvent, middleEvent2, nextOne1, endEvent),
+			Lists.newArrayList(startEvent, middleEvent3, nextOne1, endEvent)
+		));
+	}
+
+
+	/////////////////////////////////////////    Utility      ////////////////////////////////
 	private List<List<Event>> feedNFA(List<StreamRecord<Event>> inputEvents, NFA<Event> nfa) {
 		List<List<Event>> resultingPatterns = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index da1f9c8..ced9efe 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -149,45 +149,6 @@ public class NFACompilerTest extends TestLogger {
 		assertEquals(0, endingState.getStateTransitions().size());
 	}
 
-	@Test
-	public void testNFACompilerWithKleeneStar() {
-
-		Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(startFilter)
-			.followedBy("middle").subtype(SubEvent.class).oneOrMore().optional()
-			.followedBy("end").where(endFilter);
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, serializer, false);
-
-		Set<State<Event>> states = nfa.getStates();
-		assertEquals(5, states.size());
-
-
-		Set<Tuple2<String, Set<Tuple2<String, StateTransitionAction>>>> stateMap = new HashSet<>();
-		for (State<Event> state : states) {
-			stateMap.add(Tuple2.of(state.getName(), unfoldTransitions(state)));
-		}
-
-		assertEquals(stateMap, newHashSet(
-			Tuple2.of("start", newHashSet(Tuple2.of("middle", StateTransitionAction.TAKE))),
-			Tuple2.of("middle", newHashSet(
-				Tuple2.of("middle", StateTransitionAction.IGNORE),
-				Tuple2.of("middle", StateTransitionAction.TAKE)
-			)),
-		    Tuple2.of("middle", newHashSet(
-			    Tuple2.of("middle", StateTransitionAction.IGNORE),
-			    Tuple2.of("middle", StateTransitionAction.TAKE),
-			    Tuple2.of("end", StateTransitionAction.PROCEED)
-		    )),
-			Tuple2.of("end", newHashSet(
-				Tuple2.of(NFACompiler.ENDING_STATE_NAME, StateTransitionAction.TAKE),
-				Tuple2.of("end", StateTransitionAction.IGNORE)
-			)),
-		    Tuple2.of(NFACompiler.ENDING_STATE_NAME, Sets.newHashSet())
-		));
-
-	}
-
-
 	private <T> Set<Tuple2<String, StateTransitionAction>> unfoldTransitions(final State<T> state) {
 		final Set<Tuple2<String, StateTransitionAction>> transitions = new HashSet<>();
 		for (StateTransition<T> transition : state.getStateTransitions()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index d599ec9..8465bc3 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -208,6 +208,7 @@ public class CEPOperatorTest extends TestLogger {
 	 * Tests that the internal time of a CEP operator advances only given watermarks. See FLINK-5033
 	 */
 	@Test
+	@SuppressWarnings("unchecked")
 	public void testKeyedAdvancingTimeWithoutElements() throws Exception {
 		final KeySelector<Event, Integer> keySelector = new TestKeySelector();
 
@@ -522,7 +523,7 @@ public class CEPOperatorTest extends TestLogger {
 							return value.getName().equals("start");
 						}
 					})
-					.followedBy("middle").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+					.followedByAny("middle").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
 						private static final long serialVersionUID = 6215754202506583964L;
 
 						@Override
@@ -530,7 +531,7 @@ public class CEPOperatorTest extends TestLogger {
 							return value.getVolume() > 5.0;
 						}
 					})
-					.followedBy("end").where(new SimpleCondition<Event>() {
+					.followedByAny("end").where(new SimpleCondition<Event>() {
 						private static final long serialVersionUID = 7056763917392056548L;
 
 						@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index 8c4304a..3c1da2e 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.pattern;
 
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy;
 import org.apache.flink.cep.pattern.conditions.OrCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.cep.pattern.conditions.SubtypeCondition;
@@ -58,8 +59,8 @@ public class PatternTest extends TestLogger {
 		assertNotNull(previous2 = previous.getPrevious());
 		assertNull(previous2.getPrevious());
 
-		assertTrue(pattern instanceof FollowedByPattern);
-		assertTrue(previous instanceof FollowedByPattern);
+		assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, pattern.getQuantifier().getConsumingStrategy());
+		assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, previous.getQuantifier().getConsumingStrategy());
 
 		assertEquals(pattern.getName(), "end");
 		assertEquals(previous.getName(), "next");
@@ -137,7 +138,7 @@ public class PatternTest extends TestLogger {
 		assertNotNull(previous2 = previous.getPrevious());
 		assertNull(previous2.getPrevious());
 
-		assertTrue(pattern instanceof FollowedByPattern);
+		assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, pattern.getQuantifier().getConsumingStrategy());
 		assertNotNull(previous.getCondition());
 
 		assertEquals(pattern.getName(), "end");
@@ -177,7 +178,7 @@ public class PatternTest extends TestLogger {
 		assertNotNull(previous2 = previous.getPrevious());
 		assertNull(previous2.getPrevious());
 
-		assertTrue(pattern instanceof FollowedByPattern);
+		assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, pattern.getQuantifier().getConsumingStrategy());
 		assertFalse(previous.getCondition() instanceof OrCondition);
 		assertTrue(previous2.getCondition() instanceof OrCondition);
 


Mime
View raw message