flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject flink git commit: [FLINK-6165] [cep] Implement internal continuity for looping states.
Date Thu, 30 Mar 2017 11:22:34 GMT
Repository: flink
Updated Branches:
  refs/heads/master d4665a00a -> aa3c395b9


[FLINK-6165] [cep] Implement internal continuity for looping states.

Allows looping states (oneOrMore, zeroOrMore, times) to specify
if they want their elements to be consecutive or allow non-matching
elements in-between.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa3c395b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa3c395b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa3c395b

Branch: refs/heads/master
Commit: aa3c395b97943e312dd16964b363f0e4f86c6739
Parents: d4665a0
Author: Dawid Wysakowicz <dawid@getindata.com>
Authored: Mon Mar 27 15:05:11 2017 +0200
Committer: kl0u <kkloudas@gmail.com>
Committed: Thu Mar 30 10:24:19 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            |  66 ++++
 .../flink/cep/scala/pattern/Pattern.scala       |  29 ++
 .../java/org/apache/flink/cep/nfa/State.java    |   6 +-
 .../flink/cep/nfa/compiler/NFACompiler.java     | 189 +++++-----
 .../org/apache/flink/cep/pattern/Pattern.java   |  71 ++++
 .../apache/flink/cep/pattern/Quantifier.java    |  18 +-
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 374 +++++++++++++++----
 7 files changed, 603 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 932ba30..bb704c7 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -396,6 +396,7 @@ patternState.within(Time.seconds(10));
           <td>
               <p>Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.</p>
               <p>If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.</p>
+              <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_java">consecutive</a></p>
       {% highlight java %}
       patternState.zeroOrMore();
       {% endhighlight %}
@@ -406,6 +407,7 @@ patternState.within(Time.seconds(10));
           <td>
               <p>Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.</p>
               <p>If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.</p>
+              <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_java">consecutive</a></p>
       {% highlight java %}
       patternState.oneOrMore();
       {% endhighlight %}
@@ -424,11 +426,50 @@ patternState.within(Time.seconds(10));
           <td><strong>Times</strong></td>
           <td>
               <p>Specifies exact number of times that this pattern should be matched.</p>
+              <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_java">consecutive</a></p>
       {% highlight java %}
       patternState.times(2);
       {% endhighlight %}
           </td>
        </tr>
+       <tr>
+          <td><strong>Consecutive</strong><a name="consecutive_java"></a></td>
+          <td>
+              <p>Works in conjunction with zeroOrMore, oneOrMore or times. Specifies that any not matching element breaks the loop.</p>
+              
+              <p>If not applied a relaxed continuity (as in followedBy) is used.</p>
+
+          <p>E.g. a pattern like:</p>
+      {% highlight java %}
+      Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+           @Override
+           public boolean filter(Event value) throws Exception {
+               return value.getName().equals("c");
+           }
+      })
+      .followedBy("middle").where(new SimpleCondition<Event>() {
+           @Override
+           public boolean filter(Event value) throws Exception {
+               return value.getName().equals("a");
+           }
+      })
+      .oneOrMore(true).consecutive()
+      .followedBy("end1").where(new SimpleCondition<Event>() {
+           @Override
+           public boolean filter(Event value) throws Exception {
+               return value.getName().equals("b");
+           }
+      });
+      {% endhighlight %}
+
+             <p>Will generate the following matches for a sequence: C D A1 A2 A3 D A4 B</p>
+
+             <p>with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}</p>
+             <p>without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}</p>
+
+             <p><b>NOTICE:</b> This option can be applied only to zeroOrMore(), oneOrMore() and times()!</p>
+          </td>
+       </tr>
   </tbody>
 </table>
 </div>
@@ -511,6 +552,7 @@ patternState.within(Time.seconds(10))
           <td>
               <p>Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.</p>
               <p>If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.</p>
+              <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_scala">consecutive</a></p>
       {% highlight scala %}
       patternState.zeroOrMore()
       {% endhighlight %}
@@ -521,6 +563,7 @@ patternState.within(Time.seconds(10))
           <td>
               <p>Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.</p>
               <p>If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.</p>
+              <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_scala">consecutive</a></p>
       {% highlight scala %}
       patternState.oneOrMore()
       {% endhighlight %}
@@ -539,11 +582,34 @@ patternState.within(Time.seconds(10))
           <td><strong>Times</strong></td>
           <td>
               <p>Specifies exact number of times that this pattern should be matched.</p>
+              <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_scala">consecutive</a></p>
       {% highlight scala %}
       patternState.times(2)
       {% endhighlight %}
           </td>
        </tr>
+       <tr>
+          <td><strong>Consecutive</strong><a name="consecutive_scala"></a></td>
+          <td>
+            <p>Works in conjunction with zeroOrMore, oneOrMore or times. Specifies that any not matching element breaks the loop.</p>
+            
+            <p>If not applied a relaxed continuity (as in followedBy) is used.</p>
+            
+      {% highlight scala %}
+      Pattern.begin("start").where(_.getName().equals("c"))
+       .followedBy("middle").where(_.getName().equals("a"))
+                            .oneOrMore(true).consecutive()
+       .followedBy("end1").where(_.getName().equals("b"));
+      {% endhighlight %}
+
+            <p>Will generate the following matches for a sequence: C D A1 A2 A3 D A4 B</p>
+
+            <p>with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}</p>
+            <p>without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}</p>
+
+            <p><b>NOTICE:</b> This option can be applied only to zeroOrMore(), oneOrMore() and times()!</p>
+          </td>
+       </tr>
   </tbody>
 </table>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 07dfc5a..c636029 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -270,6 +270,35 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     this
   }
 
+
+  /**
+    * Works in conjunction with [[org.apache.flink.cep.scala.pattern.Pattern#zeroOrMore()]],
+    * [[org.apache.flink.cep.scala.pattern.Pattern#oneOrMore()]] or
+    * [[org.apache.flink.cep.scala.pattern.Pattern#times(int)]].
+    * Specifies that any not matching element breaks the loop.
+    *
+    * <p>E.g. a pattern like:
+    * {{{
+    * Pattern.begin("start").where(_.getName().equals("c"))
+    *        .followedBy("middle").where(_.getName().equals("a")).oneOrMore(true).consecutive()
+    *        .followedBy("end1").where(_.getName().equals("b"));
+    * }}}
+    *
+    * <p>for a sequence: C D A1 A2 A3 D A4 B
+    *
+    * <p>will generate matches: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}
+    *
+    * <p><b>NOTICE:</b> This operator can be applied only when either zeroOrMore,
+    * oneOrMore or times was previously applied!
+    *
+    * <p>By default a relaxed continuity is applied.
+    * @return pattern with continuity changed to strict
+    */
+  def consecutive(): Pattern[T, F] = {
+    jPattern.consecutive()
+    this
+  }
+
 }
 
 object Pattern {

http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index c673576..2503ffd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -41,7 +41,7 @@ public class State<T> implements Serializable {
 	private static final long serialVersionUID = 6658700025989097781L;
 
 	private final String name;
-	private final StateType stateType;
+	private StateType stateType;
 	private final Collection<StateTransition<T>> stateTransitions;
 
 	public State(final String name, final StateType stateType) {
@@ -65,6 +65,10 @@ public class State<T> implements Serializable {
 		return stateTransitions;
 	}
 
+	public void makeStart() {
+		this.stateType = StateType.Start;
+	}
+
 	private void addStateTransition(
 			final StateTransitionAction action,
 			final State<T> targetState,

http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 4fb918f..e441c4b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -152,6 +152,7 @@ public class NFACompiler {
 
 		/**
 		 * Creates all the states between Start and Final state.
+		 *
 		 * @param sinkState the state that last state should point to (always the Final state)
 		 * @return the next state after Start in the resulting graph
 		 */
@@ -160,27 +161,25 @@ public class NFACompiler {
 			State<T> lastSink = sinkState;
 			while (currentPattern.getPrevious() != null) {
 				checkPatternNameUniqueness();
-
-				State<T> sourceState = new State<>(currentPattern.getName(), State.StateType.Normal);
-				states.add(sourceState);
-				usedNames.add(sourceState.getName());
+				usedNames.add(currentPattern.getName());
 
 				if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
-					convertToLooping(sourceState, lastSink);
+					final State<T> looping = createLooping(lastSink);
 
 					if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) {
-						sourceState = createFirstMandatoryStateOfLoop(sourceState, State.StateType.Normal);
-						states.add(sourceState);
-						usedNames.add(sourceState.getName());
+						lastSink = createFirstMandatoryStateOfLoop(looping);
+					} else if (currentPattern instanceof FollowedByPattern &&
+								currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)) {
+						lastSink = createWaitingStateForZeroOrMore(looping, lastSink);
+					} else {
+						lastSink = looping;
 					}
-				} else if (currentPattern.getQuantifier() == Quantifier.TIMES) {
-					sourceState = convertToTimesState(sourceState, lastSink, currentPattern.getTimes());
+				} else if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.TIMES)) {
+					lastSink = createTimesState(lastSink, currentPattern.getTimes());
 				} else {
-					convertToSingletonState(sourceState, lastSink);
+					lastSink = createSingletonState(lastSink);
 				}
-
 				currentPattern = currentPattern.getPrevious();
-				lastSink = sourceState;
 
 				final Time currentWindowTime = currentPattern.getWindowTime();
 				if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) {
@@ -192,6 +191,30 @@ public class NFACompiler {
 			return lastSink;
 		}
 
+		/**
+		 * Creates a pair of states that enables relaxed strictness before a zeroOrMore looping state.
+		 *
+		 * @param loopingState the first state of zeroOrMore complex state
+		 * @param lastSink     the state that the looping one points to
+		 * @return the newly created state
+		 */
+		private State<T> createWaitingStateForZeroOrMore(final State<T> loopingState, final State<T> lastSink) {
+			final State<T> followByState = createNormalState();
+			final State<T> followByStateWithoutProceed = createNormalState();
+
+			final IterativeCondition<T> currentFunction = (IterativeCondition<T>)currentPattern.getCondition();
+			final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
+
+			followByState.addProceed(lastSink, BooleanConditions.<T>trueFunction());
+			followByState.addIgnore(followByStateWithoutProceed, ignoreFunction);
+			followByState.addTake(loopingState, currentFunction);
+
+			followByStateWithoutProceed.addIgnore(ignoreFunction);
+			followByStateWithoutProceed.addTake(loopingState, currentFunction);
+
+			return followByState;
+		}
+
 		private void checkPatternNameUniqueness() {
 			if (usedNames.contains(currentPattern.getName())) {
 				throw new MalformedPatternException(
@@ -202,110 +225,112 @@ public class NFACompiler {
 
 		/**
 		 * Creates the Start {@link State} of the resulting NFA graph.
+		 *
 		 * @param sinkState the state that Start state should point to (alwyas first state of middle states)
 		 * @return created state
 		 */
 		@SuppressWarnings("unchecked")
 		private State<T> createStartState(State<T> sinkState) {
 			checkPatternNameUniqueness();
+			usedNames.add(currentPattern.getName());
 
 			final State<T> beginningState;
 			if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
-				final State<T> loopingState;
+				final State<T> loopingState = createLooping(sinkState);
 				if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) {
-					loopingState = new State<>(currentPattern.getName(), State.StateType.Normal);
-					beginningState = createFirstMandatoryStateOfLoop(loopingState, State.StateType.Start);
-					states.add(loopingState);
+					beginningState = createFirstMandatoryStateOfLoop(loopingState);
 				} else {
-					loopingState = new State<>(currentPattern.getName(), State.StateType.Start);
 					beginningState = loopingState;
 				}
-				convertToLooping(loopingState, sinkState, true);
-			} else  {
-				if (currentPattern.getQuantifier() == Quantifier.TIMES && currentPattern.getTimes() > 1) {
-					final State<T> timesState = new State<>(currentPattern.getName(), State.StateType.Normal);
-					states.add(timesState);
-					sinkState = convertToTimesState(timesState, sinkState, currentPattern.getTimes() - 1);
-				}
-
-				beginningState = new State<>(currentPattern.getName(), State.StateType.Start);
-				convertToSingletonState(beginningState, sinkState);
+			} else if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.TIMES)) {
+				beginningState = createTimesState(sinkState, currentPattern.getTimes());
+			} else {
+				beginningState = createSingletonState(sinkState);
 			}
 
-			states.add(beginningState);
-			usedNames.add(beginningState.getName());
+			beginningState.makeStart();
 
 			return beginningState;
 		}
 
 		/**
-		 * Converts the given state into a "complex" state consisting of given number of states with
+		 * Creates a "complex" state consisting of given number of states with
 		 * same {@link IterativeCondition}
 		 *
-		 * @param sourceState the state to be converted
-		 * @param sinkState the state that the converted state should point to
-		 * @param times number of times the state should be copied
+		 * @param sinkState the state that the created state should point to
+		 * @param times     number of times the state should be copied
 		 * @return the first state of the "complex" state, next state should point to it
 		 */
-		private State<T> convertToTimesState(final State<T> sourceState, final State<T> sinkState, int times) {
-			convertToSingletonState(sourceState, sinkState);
-			State<T> lastSink;
-			State<T> firstState = sourceState;
+		private State<T> createTimesState(final State<T> sinkState, int times) {
+			State<T> lastSink = sinkState;
 			for (int i = 0; i < times - 1; i++) {
-				lastSink = firstState;
-				firstState = new State<>(currentPattern.getName(), State.StateType.Normal);
-				states.add(firstState);
-				convertToSingletonState(firstState, lastSink);
+				lastSink = createSingletonState(
+					lastSink,
+					currentPattern instanceof FollowedByPattern &&
+					!currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT));
 			}
-			return firstState;
+			return createSingletonState(lastSink, currentPattern instanceof FollowedByPattern);
 		}
 
 		/**
-		 * Converts the given state into a simple single state. For an OPTIONAL state it also consists
+		 * Creates a simple single state. For an OPTIONAL state it also consists
 		 * of a similar state without the PROCEED edge, so that for each PROCEED transition branches
 		 * in computation state graph  can be created only once.
 		 *
-		 * @param sourceState the state to be converted
 		 * @param sinkState state that the state being converted should point to
+		 * @return the created state
 		 */
 		@SuppressWarnings("unchecked")
-		private void convertToSingletonState(final State<T> sourceState, final State<T> sinkState) {
+		private State<T> createSingletonState(final State<T> sinkState) {
+			return createSingletonState(sinkState, currentPattern instanceof FollowedByPattern);
+		}
 
+		/**
+		 * Creates a simple single state. For an OPTIONAL state it also consists
+		 * of a similar state without the PROCEED edge, so that for each PROCEED transition branches
+		 * in computation state graph  can be created only once.
+		 *
+		 * @param addIgnore if any IGNORE should be added
+		 * @param sinkState state that the state being converted should point to
+		 * @return the created state
+		 */
+		@SuppressWarnings("unchecked")
+		private State<T> createSingletonState(final State<T> sinkState, boolean addIgnore) {
 			final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition();
 			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
 
-			sourceState.addTake(sinkState, currentFilterFunction);
+			final State<T> singletonState = createNormalState();
+			singletonState.addTake(sinkState, currentFilterFunction);
 
 			if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) {
-				sourceState.addProceed(sinkState, trueFunction);
+				singletonState.addProceed(sinkState, trueFunction);
 			}
 
-			if (currentPattern instanceof FollowedByPattern) {
+			if (addIgnore) {
 				final State<T> ignoreState;
 				if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) {
-					ignoreState = new State<>(currentPattern.getName(), State.StateType.Normal);
+					ignoreState = createNormalState();
 					ignoreState.addTake(sinkState, currentFilterFunction);
-					states.add(ignoreState);
 				} else {
-					ignoreState = sourceState;
+					ignoreState = singletonState;
 				}
-				sourceState.addIgnore(ignoreState, trueFunction);
+				singletonState.addIgnore(ignoreState, trueFunction);
 			}
+			return singletonState;
 		}
 
 		/**
-		 * Patterns with quantifiers AT_LEAST_ONE_* are converted into pair of states: a singleton state and
+		 * Patterns with quantifiers AT_LEAST_ONE_* are created as a pair of states: a singleton state and
 		 * looping state. This method creates the first of the two.
 		 *
 		 * @param sinkState the state the newly created state should point to, it should be a looping state
-		 * @param stateType the type of the created state, as the NFA graph can also start wit AT_LEAST_ONE_*
 		 * @return the newly created state
 		 */
 		@SuppressWarnings("unchecked")
-		private State<T> createFirstMandatoryStateOfLoop(final State<T> sinkState, final State.StateType stateType) {
+		private State<T> createFirstMandatoryStateOfLoop(final State<T> sinkState) {
 
 			final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition();
-			final State<T> firstState = new State<>(currentPattern.getName(), stateType);
+			final State<T> firstState = createNormalState();
 
 			firstState.addTake(sinkState, currentFilterFunction);
 			if (currentPattern instanceof FollowedByPattern) {
@@ -316,49 +341,45 @@ public class NFACompiler {
 		}
 
 		/**
-		 * Converts the given state into looping one. Looping state is one with TAKE edge to itself and
+		 * Creates the given state as a looping one. Looping state is one with TAKE edge to itself and
 		 * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that
 		 * for each PROCEED transition branches in computation state graph  can be created only once.
 		 *
-		 * <p>If this looping state is first of a graph we should treat the {@link Pattern} as {@link FollowedByPattern}
-		 * to enable combinations.
-		 *
-		 * @param sourceState  the state to converted
-		 * @param sinkState    the state that the converted state should point to
-		 * @param isFirstState if the looping state is first of a graph
+		 * @param sinkState the state that the converted state should point to
+		 * @return the first state of the created complex state
 		 */
 		@SuppressWarnings("unchecked")
-		private void convertToLooping(final State<T> sourceState, final State<T> sinkState, boolean isFirstState) {
+		private State<T> createLooping(final State<T> sinkState) {
 
+			final State<T> loopingState = createNormalState();
 			final IterativeCondition<T> filterFunction = (IterativeCondition<T>) currentPattern.getCondition();
-			final IterativeCondition<T> trueFunction = BooleanConditions.<T>trueFunction();
+			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
 
-			sourceState.addProceed(sinkState, trueFunction);
-			sourceState.addTake(filterFunction);
-			if (currentPattern instanceof FollowedByPattern || isFirstState) {
-				final State<T> ignoreState = new State<>(
-					currentPattern.getName(),
-					State.StateType.Normal);
+			loopingState.addProceed(sinkState, trueFunction);
+			loopingState.addTake(filterFunction);
+			if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)) {
+				final State<T> ignoreState = createNormalState();
 
 				final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
 
-				sourceState.addIgnore(ignoreState, ignoreCondition);
-				ignoreState.addTake(sourceState, filterFunction);
-				ignoreState.addIgnore(ignoreState, ignoreCondition);
-				states.add(ignoreState);
+				ignoreState.addTake(loopingState, filterFunction);
+				ignoreState.addIgnore(ignoreCondition);
+				loopingState.addIgnore(ignoreState, ignoreCondition);
 			}
+
+			return loopingState;
 		}
 
 		/**
-		 * Converts the given state into looping one. Looping state is one with TAKE edge to itself and
-		 * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that
-		 * for each PROCEED transition branches in computation state graph  can be created only once.
+		 * Creates a state with {@link State.StateType#Normal} and adds it to the collection of created states.
+		 * Should be used instead of instantiating with new operator.
 		 *
-		 * @param sourceState the state to converted
-		 * @param sinkState   the state that the converted state should point to
+		 * @return the created state
 		 */
-		private void convertToLooping(final State<T> sourceState, final State<T> sinkState) {
-			convertToLooping(sourceState, sinkState, false);
+		private State<T> createNormalState() {
+			final State<T> state = new State<>(currentPattern.getName(), State.StateType.Normal);
+			states.add(state);
+			return state;
 		}
 
 		/**
@@ -381,7 +402,7 @@ public class NFACompiler {
 	 * has at most one TAKE and one IGNORE and name of each state is unique. No PROCEED transition is allowed!
 	 *
 	 * @param oldStartState dummy start state of old graph
-	 * @param <T> type of events
+	 * @param <T>           type of events
 	 * @return map of new states, where key is the name of a state and value is the state itself
 	 */
 	@Internal

http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index cd51788..14c3e2d 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -267,6 +267,76 @@ public class Pattern<T, F extends T> {
 	}
 
 	/**
+	 * Works in conjunction with {@link Pattern#zeroOrMore()}, {@link Pattern#oneOrMore()} or {@link Pattern#times(int)}.
+	 * Specifies that any not matching element breaks the loop.
+	 *
+	 * <p>E.g. a pattern like:
+	 * <pre>{@code
+	 * Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+	 *      @Override
+	 *      public boolean filter(Event value) throws Exception {
+	 *          return value.getName().equals("c");
+	 *      }
+	 * })
+	 * .followedBy("middle").where(new FilterFunction<Event>() {
+	 *      @Override
+	 *      public boolean filter(Event value) throws Exception {
+	 *          return value.getName().equals("a");
+	 *      }
+	 * })
+	 * }<b>.oneOrMore(true).consecutive()</b>{@code
+	 * .followedBy("end1").where(new FilterFunction<Event>() {
+	 *      @Override
+	 *      public boolean filter(Event value) throws Exception {
+	 *          return value.getName().equals("b");
+	 *      }
+	 * });
+	 * }</pre>
+	 *
+	 * <p>for a sequence: C D A1 A2 A3 D A4 B
+	 *
+	 * <p>will generate matches: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}
+	 *
+	 * <p><b>NOTICE:</b> This operator can be applied only when either zeroOrMore,
+	 * oneOrMore or times was previously applied!
+	 *
+	 * <p>By default a relaxed continuity is applied.
+	 *
+	 * @return pattern with continuity changed to strict
+	 */
+	public Pattern<T, F> consecutive() {
+		switch (this.quantifier) {
+
+			case ZERO_OR_MORE_EAGER:
+				this.quantifier = Quantifier.ZERO_OR_MORE_EAGER_STRICT;
+				break;
+			case ZERO_OR_MORE_COMBINATIONS:
+				this.quantifier = Quantifier.ZERO_OR_MORE_COMBINATIONS_STRICT;
+				break;
+			case ONE_OR_MORE_EAGER:
+				this.quantifier = Quantifier.ONE_OR_MORE_EAGER_STRICT;
+				break;
+			case ONE_OR_MORE_COMBINATIONS:
+				this.quantifier = Quantifier.ONE_OR_MORE_COMBINATIONS_STRICT;
+				break;
+			case TIMES:
+				this.quantifier = Quantifier.TIMES_STRICT;
+				break;
+			case ZERO_OR_MORE_COMBINATIONS_STRICT:
+			case ONE_OR_MORE_EAGER_STRICT:
+			case ONE_OR_MORE_COMBINATIONS_STRICT:
+			case ZERO_OR_MORE_EAGER_STRICT:
+			case TIMES_STRICT:
+				throw new MalformedPatternException("Strict continuity already applied! consecutive() called twice.");
+			case ONE:
+			case OPTIONAL:
+				throw new MalformedPatternException("Strict continuity cannot be applied to " + this.quantifier);
+		}
+
+		return this;
+	}
+
+	/**
 	 * Specifies that this pattern can occur zero or once.
 	 *
 	 * @return The same pattern with applied Kleene ? operator
@@ -300,4 +370,5 @@ public class Pattern<T, F extends T> {
 			throw new MalformedPatternException("Already applied quantifier to this Pattern. Current quantifier is: " + this.quantifier);
 		}
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index 7abe9bd..9789072 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -23,12 +23,24 @@ public enum Quantifier {
 	ONE,
 	ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, QuantifierProperty.EAGER),
 	ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING),
+	ZERO_OR_MORE_EAGER_STRICT(QuantifierProperty.EAGER, QuantifierProperty.STRICT, QuantifierProperty.LOOPING),
+	ZERO_OR_MORE_COMBINATIONS_STRICT(QuantifierProperty.STRICT, QuantifierProperty.LOOPING),
 	ONE_OR_MORE_EAGER(
 		QuantifierProperty.LOOPING,
 		QuantifierProperty.EAGER,
 		QuantifierProperty.AT_LEAST_ONE),
+	ONE_OR_MORE_EAGER_STRICT(
+		QuantifierProperty.STRICT,
+		QuantifierProperty.LOOPING,
+		QuantifierProperty.EAGER,
+		QuantifierProperty.AT_LEAST_ONE),
 	ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, QuantifierProperty.AT_LEAST_ONE),
-	TIMES,
+	ONE_OR_MORE_COMBINATIONS_STRICT(
+		QuantifierProperty.STRICT,
+		QuantifierProperty.LOOPING,
+		QuantifierProperty.AT_LEAST_ONE),
+	TIMES(QuantifierProperty.TIMES),
+	TIMES_STRICT(QuantifierProperty.TIMES, QuantifierProperty.STRICT),
 	OPTIONAL;
 
 	private final EnumSet<QuantifierProperty> properties;
@@ -48,7 +60,9 @@ public enum Quantifier {
 	public enum QuantifierProperty {
 		LOOPING,
 		EAGER,
-		AT_LEAST_ONE
+		AT_LEAST_ONE,
+		STRICT,
+		TIMES
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 197767e..da5f413 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -591,8 +591,9 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(startEvent, 1));
 		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
 		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
-		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
-		inputEvents.add(new StreamRecord<>(end1, 6));
+		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(end1, 7));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
@@ -642,7 +643,6 @@ public class NFAITCase extends TestLogger {
 		), resultingPatterns);
 	}
 
-
 	@Test
 	public void testBeginWithZeroOrMore() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
@@ -1129,7 +1129,7 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("middle");
 			}
-		}).zeroOrMore(false).followedBy("end").where(new SimpleCondition<Event>() {
+		}).zeroOrMore().consecutive().followedBy("end").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 7056763917392056548L;
 
 			@Override
@@ -1281,7 +1281,6 @@ public class NFAITCase extends TestLogger {
 		), resultingPatterns);
 	}
 
-
 	@Test
 	public void testTimes() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
@@ -1659,9 +1658,293 @@ public class NFAITCase extends TestLogger {
 		), resultingPatterns);
 	}
 
-	/**
-	 * Clearing SharedBuffer
-	 */
+
+	///////////////////////////////         Consecutive           ////////////////////////////////////////
+
+	private static class ConsecutiveData {
+		static final Event startEvent = new Event(40, "c", 1.0);
+		static final Event middleEvent1 = new Event(41, "a", 2.0);
+		static final Event middleEvent2 = new Event(42, "a", 3.0);
+		static final Event middleEvent3 = new Event(43, "a", 4.0);
+		static final Event middleEvent4 = new Event(43, "a", 5.0);
+		static final Event end = new Event(44, "b", 5.0);
+
+		private ConsecutiveData() {
+		}
+	}
+
+	@Test
+	public void testStrictCombinationsOneOrMore() {
+		List<List<Event>> resultingPatterns = testStrictOneOrMore(false);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent4, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testStrictEagerOneOrMore() {
+		List<List<Event>> resultingPatterns = testStrictOneOrMore(true);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end)
+		));
+	}
+
+	private List<List<Event>> testStrictOneOrMore(boolean eager) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent4, 7));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 8));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore(eager).consecutive()
+			.followedBy("end1").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("b");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+
+	@Test
+	public void testStrictEagerZeroOrMore() {
+		List<List<Event>> resultingPatterns = testStrictZeroOrMore(true);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testStrictCombinationsZeroOrMore() {
+		List<List<Event>> resultingPatterns = testStrictZeroOrMore(false);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+		));
+	}
+
+	private List<List<Event>> testStrictZeroOrMore(boolean eager) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore(eager).consecutive().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+
+
+	@Test
+	public void testTimesStrict() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(2).consecutive().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesNonStrict() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(2).followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testStartWithZeroOrMoreStrict() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore().consecutive();
+
+		testStartWithOneOrZeroOrMoreStrict(pattern);
+	}
+
+	@Test
+	public void testStartWithOneOrMoreStrict() {
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().consecutive();
+
+		testStartWithOneOrZeroOrMoreStrict(pattern);
+	}
+
+	private void testStartWithOneOrZeroOrMoreStrict(Pattern<Event, ?> pattern) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.middleEvent1),
+			Lists.newArrayList(ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3),
+			Lists.newArrayList(ConsecutiveData.middleEvent2),
+			Lists.newArrayList(ConsecutiveData.middleEvent3)
+		));
+	}
+
+	///////////////////////////////     Clearing SharedBuffer     ////////////////////////////////////////
 
 	@Test
 	public void testTimesClearingBuffer() {
@@ -1934,17 +2217,7 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		List<List<Event>> resultingPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-					inputEvent.getValue(),
-					inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> p: patterns) {
-				resultingPatterns.add(new ArrayList<>(p.values()));
-			}
-		}
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
 		return resultingPatterns;
 	}
@@ -2019,17 +2292,7 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		List<List<Event>> resultingPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-					inputEvent.getValue(),
-					inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> p: patterns) {
-				resultingPatterns.add(new ArrayList<>(p.values()));
-			}
-		}
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
 		return resultingPatterns;
 	}
@@ -2068,17 +2331,7 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		List<List<Event>> resultingPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-					inputEvent.getValue(),
-					inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> p: patterns) {
-				resultingPatterns.add(new ArrayList<>(p.values()));
-			}
-		}
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
 		compareMaps(resultingPatterns,
 				Lists.<List<Event>>newArrayList(
@@ -2145,17 +2398,7 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		List<List<Event>> resultingPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-					inputEvent.getValue(),
-					inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> p: patterns) {
-				resultingPatterns.add(new ArrayList<>(p.values()));
-			}
-		}
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
 		compareMaps(resultingPatterns,
 				Lists.<List<Event>>newArrayList(
@@ -2212,17 +2455,7 @@ public class NFAITCase extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		List<List<Event>> resultingPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, Event>> patterns = nfa.process(
-					inputEvent.getValue(),
-					inputEvent.getTimestamp()).f0;
-
-			for (Map<String, Event> p: patterns) {
-				resultingPatterns.add(new ArrayList<>(p.values()));
-			}
-		}
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
 		compareMaps(resultingPatterns,
 				Lists.<List<Event>>newArrayList(
@@ -2237,6 +2470,21 @@ public class NFAITCase extends TestLogger {
 		);
 	}
 
+	private List<List<Event>> feedNFA(List<StreamRecord<Event>> inputEvents, NFA<Event> nfa) {
+		List<List<Event>> resultingPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> p: patterns) {
+				resultingPatterns.add(new ArrayList<>(p.values()));
+			}
+		}
+		return resultingPatterns;
+	}
+
 	private void compareMaps(List<List<Event>> actual, List<List<Event>> expected) {
 		Assert.assertEquals(expected.size(), actual.size());
 


Mime
View raw message