flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [2/2] flink git commit: [FLINK-3320] Add NOT pattern support to CEP's pattern API
Date Fri, 05 May 2017 12:02:01 GMT
[FLINK-3320] Add NOT pattern support to CEP's pattern API

NOT patterns are not yet supported when an OPTIONAL
pattern directly preceeds a NOT. In these cases, an
exception is thrown that proposes an alternative
(but not the most efficient) way to support these
patterns.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5795ebe1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5795ebe1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5795ebe1

Branch: refs/heads/master
Commit: 5795ebe187d6e6dce2a52ec2d245071d101437e7
Parents: d7364ff
Author: Dawid Wysakowicz <dawid@getindata.com>
Authored: Fri Apr 28 15:24:37 2017 +0200
Committer: kl0u <kkloudas@gmail.com>
Committed: Fri May 5 13:59:36 2017 +0200

----------------------------------------------------------------------
 .../flink/cep/scala/pattern/Pattern.scala       |   24 +
 .../apache/flink/cep/nfa/ComputationState.java  |    4 +
 .../main/java/org/apache/flink/cep/nfa/NFA.java |   62 +-
 .../java/org/apache/flink/cep/nfa/State.java    |   13 +-
 .../flink/cep/nfa/compiler/NFACompiler.java     |  321 ++++--
 .../AbstractKeyedCEPPatternOperator.java        |    1 -
 .../org/apache/flink/cep/pattern/Pattern.java   |   42 +-
 .../apache/flink/cep/pattern/Quantifier.java    |   12 +-
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 1056 +++++++++++++++++-
 .../flink/cep/nfa/compiler/NFACompilerTest.java |   35 +
 .../apache/flink/cep/pattern/PatternTest.java   |   76 +-
 11 files changed, 1497 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 65b7ab0..fe7a30c 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -201,6 +201,17 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
   }
 
   /**
+    * Appends a new pattern to the existing one. The new pattern enforces that there is no event
+    * matching this pattern right after the preceding matched event.
+    *
+    * @param name Name of the new pattern
+    * @return A new pattern which is appended to this one
+    */
+  def notNext(name: String): Pattern[T, T] = {
+    Pattern[T, T](jPattern.notNext(name))
+  }
+
+  /**
     * Appends a new pattern to the existing one. The new pattern enforces non-strict
     * temporal contiguity. This means that a matching event of this pattern and the
     * preceding matching event might be interleaved with other events which are ignored.
@@ -213,6 +224,19 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
   }
 
   /**
+    * Appends a new pattern to the existing one. The new pattern enforces that there is no event
+    * matching this pattern between the preceding pattern and succeeding this one.
+    *
+    * NOTE: There has to be other pattern after this one.
+    *
+    * @param name Name of the new pattern
+    * @return A new pattern which is appended to this one
+    */
+  def notFollowedBy(name : String) {
+    Pattern[T, T](jPattern.notFollowedBy(name))
+  }
+
+  /**
     * Appends a new pattern to the existing one. The new pattern enforces non-strict
     * temporal contiguity. This means that a matching event of this pattern and the
     * preceding matching event might be interleaved with other events which are ignored.

http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
index 80227fc..08b9b78 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
@@ -127,6 +127,10 @@ public class ComputationState<T> {
 		return new ComputationState<>(nfa, currentState, previousState, event, timestamp, version, startTimestamp);
 	}
 
+	public boolean isStopState() {
+		return state.isStop();
+	}
+
 	/**
 	 * The context used when evaluating this computation state.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 98c1fc9..b8c4e65 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -21,21 +21,6 @@ package org.apache.flink.cep.nfa;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.LinkedHashMultimap;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
-import org.apache.flink.cep.NonDuplicatingTypeSerializer;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.cep.pattern.conditions.IterativeCondition;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -58,6 +43,20 @@ import java.util.Set;
 import java.util.Stack;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.cep.NonDuplicatingTypeSerializer;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Non-deterministic finite automaton implementation.
@@ -180,6 +179,9 @@ public class NFA<T> implements Serializable {
 	 * resulting event sequences are returned. If computations time out and timeout handling is
 	 * activated, then the timed out event patterns are returned.
 	 *
+	 * <p>If computations reach a stop state, the path forward is discarded and currently constructed path is returned
+	 * with the element that resulted in the stop state.
+	 *
 	 * @param event The current event to be processed or null if only pruning shall be done
 	 * @param timestamp The timestamp of the current event
 	 * @return Tuple of the collection of matched patterns (e.g. the result of computations which have
@@ -222,7 +224,12 @@ public class NFA<T> implements Serializable {
 				newComputationStates = Collections.singleton(computationState);
 			}
 
-			for (ComputationState<T> newComputationState: newComputationStates) {
+
+			//delay adding new computation states in case a stop state is reached and we discard the path.
+			final Collection<ComputationState<T>> statesToRetain = new ArrayList<>();
+			//if stop state reached in this path
+			boolean shouldDiscardPath = false;
+			for (final ComputationState<T> newComputationState: newComputationStates) {
 				if (newComputationState.isFinalState()) {
 					// we've reached a final state and can thus retrieve the matching event sequence
 					Collection<Map<String, T>> matches = extractPatternMatches(newComputationState);
@@ -233,11 +240,32 @@ public class NFA<T> implements Serializable {
 							newComputationState.getPreviousState().getName(),
 							newComputationState.getEvent(),
 							newComputationState.getTimestamp());
+				} else if (newComputationState.isStopState()) {
+					//reached stop state. release entry for the stop state
+					shouldDiscardPath = true;
+					stringSharedBuffer.release(
+						newComputationState.getPreviousState().getName(),
+						newComputationState.getEvent(),
+						newComputationState.getTimestamp());
 				} else {
 					// add new computation state; it will be processed once the next event arrives
-					computationStates.add(newComputationState);
+					statesToRetain.add(newComputationState);
 				}
 			}
+
+			if (shouldDiscardPath) {
+				// a stop state was reached in this branch. release branch which results in removing previous event from
+				// the buffer
+				for (final ComputationState<T> state : statesToRetain) {
+					stringSharedBuffer.release(
+						state.getPreviousState().getName(),
+						state.getEvent(),
+						state.getTimestamp());
+				}
+			} else {
+				computationStates.addAll(statesToRetain);
+			}
+
 		}
 
 		// prune shared buffer based on window length

http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 2503ffd..275266b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -51,6 +51,10 @@ public class State<T> implements Serializable {
 		stateTransitions = new ArrayList<>();
 	}
 
+	public StateType getStateType() {
+		return stateType;
+	}
+
 	public boolean isFinal() {
 		return stateType == StateType.Final;
 	}
@@ -69,7 +73,7 @@ public class State<T> implements Serializable {
 		this.stateType = StateType.Start;
 	}
 
-	private void addStateTransition(
+	public void addStateTransition(
 			final StateTransitionAction action,
 			final State<T> targetState,
 			final IterativeCondition<T> condition) {
@@ -130,13 +134,18 @@ public class State<T> implements Serializable {
 		return Objects.hash(name, stateType, stateTransitions);
 	}
 
+	public boolean isStop() {
+		return stateType == StateType.Stop;
+	}
+
 	/**
 	 * Set of valid state types.
 	 */
 	public enum StateType {
 		Start, // the state is a starting state for the NFA
 		Final, // the state is a final state for the NFA
-		Normal // the state is neither a start nor a final state
+		Normal, // the state is neither a start nor a final state
+		Stop
 	}
 
 	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {

http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 0ca0e14..39c18b9 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
 import org.apache.flink.cep.nfa.StateTransition;
@@ -101,13 +102,15 @@ public class NFACompiler {
 	 *
 	 * @param <T>
 	 */
-	private static class NFAFactoryCompiler<T> {
+	static class NFAFactoryCompiler<T> {
 
 		private final Set<String> usedNames = new HashSet<>();
+		private final Map<String, State<T>> stopStates = new HashMap<>();
 		private final List<State<T>> states = new ArrayList<>();
 
 		private long windowTime = 0;
 		private Pattern<T, ?> currentPattern;
+		private Pattern<T, ?> followingPattern;
 
 		NFAFactoryCompiler(final Pattern<T, ?> pattern) {
 			this.currentPattern = pattern;
@@ -118,6 +121,10 @@ public class NFACompiler {
 		 * multiple NFAs.
 		 */
 		void compileFactory() {
+			if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
+				throw new MalformedPatternException("NotFollowedBy is not supported as a last part of a Pattern!");
+			}
+
 			// we're traversing the pattern from the end to the beginning --> the first state is the final state
 			State<T> sinkState = createEndingState();
 			// add all the normal states
@@ -135,13 +142,45 @@ public class NFACompiler {
 		}
 
 		/**
+		 * Retrieves list of conditions resulting in Stop state and names of the corresponding NOT patterns.
+		 *
+		 * <p>A current not condition can be produced in two cases:
+		 * <ol>
+		 *     <li>the previous pattern is a {@link Quantifier.ConsumingStrategy#NOT_FOLLOW}</li>
+		 *     <li>exists a backward path of {@link Quantifier.QuantifierProperty#OPTIONAL} patterns to
+		 *       {@link Quantifier.ConsumingStrategy#NOT_FOLLOW}</li>
+		 * </ol>
+		 *
+		 * <p><b>WARNING:</b> for more info on the second case see: {@link NFAFactoryCompiler#copyWithoutTransitiveNots(State)}
+		 *
+		 * @return list of not conditions with corresponding names
+		 */
+		private List<Tuple2<IterativeCondition<T>, String>> getCurrentNotCondition() {
+			List<Tuple2<IterativeCondition<T>, String>> notConditions = new ArrayList<>();
+
+			Pattern<T, ? extends T> previousPattern = currentPattern;
+			while (previousPattern.getPrevious() != null && (
+				previousPattern.getPrevious().getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL) ||
+				previousPattern.getPrevious().getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW)) {
+
+				previousPattern = previousPattern.getPrevious();
+
+				if (previousPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
+					final IterativeCondition<T> notCondition = (IterativeCondition<T>) previousPattern.getCondition();
+					notConditions.add(Tuple2.of(notCondition, previousPattern.getName()));
+				}
+			}
+			return notConditions;
+		}
+
+		/**
 		 * Creates the dummy Final {@link State} of the NFA graph.
 		 * @return dummy Final state
 		 */
 		private State<T> createEndingState() {
+			checkPatternNameUniqueness(ENDING_STATE_NAME);
 			State<T> endState = new State<>(ENDING_STATE_NAME, State.StateType.Final);
 			states.add(endState);
-			usedNames.add(ENDING_STATE_NAME);
 
 			windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() : 0L;
 			return endState;
@@ -154,10 +193,30 @@ public class NFACompiler {
 		 * @return the next state after Start in the resulting graph
 		 */
 		private State<T> createMiddleStates(final State<T> sinkState) {
-
 			State<T> lastSink = sinkState;
 			while (currentPattern.getPrevious() != null) {
-				lastSink = convertPattern(lastSink);
+
+				if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
+					//skip notFollow patterns, they are converted into edge conditions
+				} else if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_NEXT) {
+					final State<T> notNext = createNormalState();
+					final IterativeCondition<T> notCondition = (IterativeCondition<T>) currentPattern.getCondition();
+					final State<T> stopState = createStopState(notCondition, currentPattern.getName());
+
+					if (lastSink.isFinal()) {
+						//so that the proceed to final is not fired
+						notNext.addIgnore(lastSink, new NotCondition<>(notCondition));
+					} else {
+						notNext.addProceed(lastSink, new NotCondition<>(notCondition));
+					}
+					notNext.addProceed(stopState, notCondition);
+					lastSink = notNext;
+				} else {
+					lastSink = convertPattern(lastSink);
+				}
+
+				// we traverse the pattern graph backwards
+				followingPattern = currentPattern;
 				currentPattern = currentPattern.getPrevious();
 
 				final Time currentWindowTime = currentPattern.getWindowTime();
@@ -166,79 +225,152 @@ public class NFACompiler {
 					windowTime = currentWindowTime.toMilliseconds();
 				}
 			}
-
 			return lastSink;
 		}
 
+		/**
+		 * Creates the Start {@link State} of the resulting NFA graph.
+		 *
+		 * @param sinkState the state that Start state should point to (always first state of middle states)
+		 * @return created state
+		 */
+		@SuppressWarnings("unchecked")
+		private State<T> createStartState(State<T> sinkState) {
+			final State<T> beginningState = convertPattern(sinkState);
+			beginningState.makeStart();
+			return beginningState;
+		}
+
 		private State<T> convertPattern(final State<T> sinkState) {
 			final State<T> lastSink;
-			checkPatternNameUniqueness();
-			usedNames.add(currentPattern.getName());
+			checkPatternNameUniqueness(currentPattern.getName());
 
 			final Quantifier quantifier = currentPattern.getQuantifier();
 			if (quantifier.hasProperty(Quantifier.QuantifierProperty.LOOPING)) {
-				final State<T> looping = createLooping(sinkState);
+
+				// if loop has started then all notPatterns previous to the optional states are no longer valid
+				final State<T> sink = copyWithoutTransitiveNots(sinkState);
+				final State<T> looping = createLooping(sink);
 
 				if (!quantifier.hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
-					lastSink = createFirstMandatoryStateOfLoop(looping);
+					lastSink = createInitMandatoryStateOfOneOrMore(looping);
 				} else {
-					lastSink = createWaitingStateForZeroOrMore(looping, sinkState);
+					lastSink = createInitOptionalStateOfZeroOrMore(looping, sinkState);
 				}
 			} else if (quantifier.hasProperty(Quantifier.QuantifierProperty.TIMES)) {
 				lastSink = createTimesState(sinkState, currentPattern.getTimes());
 			} else {
 				lastSink = createSingletonState(sinkState);
 			}
+			addStopStates(lastSink);
 
 			return lastSink;
 		}
 
 		/**
-		 * Creates a pair of states that enables relaxed strictness before a zeroOrMore looping state.
+		 * Creates a state with {@link State.StateType#Normal} and adds it to the collection of created states.
+		 * Should be used instead of instantiating with new operator.
 		 *
-		 * @param loopingState the first state of zeroOrMore complex state
-		 * @param lastSink     the state that the looping one points to
-		 * @return the newly created state
+		 * @return the created state
 		 */
-		@SuppressWarnings("unchecked")
-		private State<T> createWaitingStateForZeroOrMore(final State<T> loopingState, final State<T> lastSink) {
-			final IterativeCondition<T> currentFunction = (IterativeCondition<T>)currentPattern.getCondition();
+		private State<T> createNormalState() {
+			final State<T> state = new State<>(currentPattern.getName(), State.StateType.Normal);
+			states.add(state);
+			return state;
+		}
 
-			final State<T> followByState = createNormalState();
-			followByState.addProceed(lastSink, BooleanConditions.<T>trueFunction());
-			followByState.addTake(loopingState, currentFunction);
+		private State<T> createStopState(final IterativeCondition<T> notCondition, final String name) {
+			// We should not duplicate the notStates. All states from which we can stop should point to the same one.
+			State<T> stopState = stopStates.get(name);
+			if (stopState == null) {
+				stopState = new State<>(name, State.StateType.Stop);
+				states.add(stopState);
+				stopState.addTake(notCondition);
+				stopStates.put(name, stopState);
+			}
+			return stopState;
+		}
 
-			final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
-			if (ignoreFunction != null) {
-				final State<T> followByStateWithoutProceed = createNormalState();
-				followByState.addIgnore(followByStateWithoutProceed, ignoreFunction);
-				followByStateWithoutProceed.addIgnore(ignoreFunction);
-				followByStateWithoutProceed.addTake(loopingState, currentFunction);
+		/**
+		 * This method creates an alternative state that is target for TAKE transition from an optional State.
+		 * Accepting an event in optional State discards all not Patterns that were present before it.
+		 *
+		 * <p>E.g for a Pattern begin("a").notFollowedBy("b").followedByAny("c").optional().followedByAny("d")
+		 * a sequence like : {a c b d} is a valid match, but {a b d} is not.
+		 *
+		 * <p><b>NOTICE:</b> This method creates copy only if it necessary.
+		 *
+		 * @param sinkState a state to create copy without transitive nots
+		 * @return the copy of the state itself if no modifications were needed
+		 */
+		private State<T> copyWithoutTransitiveNots(final State<T> sinkState) {
+			final List<Tuple2<IterativeCondition<T>, String>> currentNotCondition = getCurrentNotCondition();
+
+			if (currentNotCondition.isEmpty() ||
+				!currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
+				//we do not create an alternative path if we are NOT in an OPTIONAL state or there is no NOTs prior to
+				//the optional state
+				return sinkState;
 			}
 
-			return followByState;
+			final State<T> copyOfSink = new State<>(sinkState.getName(), sinkState.getStateType());
+			states.add(copyOfSink);
+
+			for (StateTransition<T> tStateTransition : sinkState.getStateTransitions()) {
+
+				if (tStateTransition.getAction() == StateTransitionAction.PROCEED) {
+					State<T> targetState = tStateTransition.getTargetState();
+					boolean remove = false;
+					if (targetState.isStop()) {
+						for (Tuple2<IterativeCondition<T>, String> notCondition : currentNotCondition) {
+							if (targetState.getName().equals(notCondition.f1)) {
+								remove = true;
+							}
+						}
+					} else {
+						targetState = copyWithoutTransitiveNots(tStateTransition.getTargetState());
+					}
+
+					if (!remove) {
+						copyOfSink.addStateTransition(tStateTransition.getAction(), targetState, tStateTransition.getCondition());
+					}
+				} else {
+					copyOfSink.addStateTransition(
+							tStateTransition.getAction(),
+							tStateTransition.getTargetState().equals(tStateTransition.getSourceState())
+									? copyOfSink
+									: tStateTransition.getTargetState(),
+							tStateTransition.getCondition()
+					);
+				}
+
+			}
+			return copyOfSink;
 		}
 
-		private void checkPatternNameUniqueness() {
-			if (usedNames.contains(currentPattern.getName())) {
-				throw new MalformedPatternException(
-					"Duplicate pattern name: " + currentPattern.getName() + ". " +
-					"Pattern names must be unique.");
+		private void addStopStates(final State<T> state) {
+			for (Tuple2<IterativeCondition<T>, String> notCondition: getCurrentNotCondition()) {
+				final State<T> stopState = createStopState(notCondition.f0, notCondition.f1);
+				state.addProceed(stopState, notCondition.f0);
 			}
 		}
 
-		/**
-		 * Creates the Start {@link State} of the resulting NFA graph.
-		 *
-		 * @param sinkState the state that Start state should point to (always first state of middle states)
-		 * @return created state
-		 */
-		@SuppressWarnings("unchecked")
-		private State<T> createStartState(State<T> sinkState) {
-			final State<T> beginningState = convertPattern(sinkState);
-			beginningState.makeStart();
+		private void addStopStateToLooping(final State<T> loopingState) {
+			if (followingPattern != null &&
+					followingPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
+				final IterativeCondition<T> notCondition = (IterativeCondition<T>) followingPattern.getCondition();
+				final State<T> stopState = createStopState(notCondition, followingPattern.getName());
+				loopingState.addProceed(stopState, notCondition);
+			}
+		}
 
-			return beginningState;
+		private void checkPatternNameUniqueness(String patternName) {
+			if (usedNames.contains(currentPattern.getName())) {
+				throw new MalformedPatternException(
+					"Duplicate pattern name: " + patternName + ". " +
+					"Pattern names must be unique.");
+			}
+			usedNames.add(patternName);
 		}
 
 		/**
@@ -250,12 +382,13 @@ public class NFACompiler {
 		 * @return the first state of the "complex" state, next state should point to it
 		 */
 		private State<T> createTimesState(final State<T> sinkState, int times) {
-			State<T> lastSink = sinkState;
+			State<T> lastSink = copyWithoutTransitiveNots(sinkState);
 			for (int i = 0; i < times - 1; i++) {
 				lastSink = createSingletonState(lastSink, getInnerIgnoreCondition(currentPattern), false);
+				addStopStateToLooping(lastSink);
 			}
 
-			final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition();
+			final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
 			final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
 
 			// we created the intermediate states in the loop, now we create the start of the loop.
@@ -264,14 +397,15 @@ public class NFACompiler {
 			}
 
 			final State<T> singletonState = createNormalState();
-			singletonState.addTake(lastSink, currentFilterFunction);
+			singletonState.addTake(lastSink, currentCondition);
 			singletonState.addProceed(sinkState, BooleanConditions.<T>trueFunction());
 
 			if (ignoreCondition != null) {
 				State<T> ignoreState = createNormalState();
-				ignoreState.addTake(lastSink, currentFilterFunction);
+				ignoreState.addTake(lastSink, currentCondition);
 				ignoreState.addIgnore(ignoreCondition);
 				singletonState.addIgnore(ignoreState, ignoreCondition);
+				addStopStates(ignoreState);
 			}
 			return singletonState;
 		}
@@ -303,13 +437,16 @@ public class NFACompiler {
 		 */
 		@SuppressWarnings("unchecked")
 		private State<T> createSingletonState(final State<T> sinkState, final IterativeCondition<T> ignoreCondition, final boolean isOptional) {
-			final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition();
+			final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
 			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
 
 			final State<T> singletonState = createNormalState();
-			singletonState.addTake(sinkState, currentFilterFunction);
+			// if event is accepted then all notPatterns previous to the optional states are no longer valid
+			final State<T> sink = copyWithoutTransitiveNots(sinkState);
+			singletonState.addTake(sink, currentCondition);
 
 			if (isOptional) {
+				// if no element accepted the previous nots are still valid.
 				singletonState.addProceed(sinkState, trueFunction);
 			}
 
@@ -317,7 +454,9 @@ public class NFACompiler {
 				final State<T> ignoreState;
 				if (isOptional) {
 					ignoreState = createNormalState();
-					ignoreState.addTake(sinkState, currentFilterFunction);
+					ignoreState.addTake(sink, currentCondition);
+					ignoreState.addIgnore(ignoreCondition);
+					addStopStates(ignoreState);
 				} else {
 					ignoreState = singletonState;
 				}
@@ -327,27 +466,6 @@ public class NFACompiler {
 		}
 
 		/**
-		 * Patterns with quantifiers AT_LEAST_ONE_* are created as a pair of states: a singleton state and
-		 * looping state. This method creates the first of the two.
-		 *
-		 * @param sinkState the state the newly created state should point to, it should be a looping state
-		 * @return the newly created state
-		 */
-		@SuppressWarnings("unchecked")
-		private State<T> createFirstMandatoryStateOfLoop(final State<T> sinkState) {
-
-			final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition();
-			final State<T> firstState = createNormalState();
-
-			firstState.addTake(sinkState, currentFilterFunction);
-			final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
-			if (ignoreCondition != null) {
-				firstState.addIgnore(ignoreCondition);
-			}
-			return firstState;
-		}
-
-		/**
 		 * Creates the given state as a looping one. Looping state is one with TAKE edge to itself and
 		 * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that
 		 * for each PROCEED transition branches in computation state graph  can be created only once.
@@ -357,34 +475,73 @@ public class NFACompiler {
 		 */
 		@SuppressWarnings("unchecked")
 		private State<T> createLooping(final State<T> sinkState) {
-			final IterativeCondition<T> filterFunction = (IterativeCondition<T>) currentPattern.getCondition();
+			final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
 			final IterativeCondition<T> ignoreCondition = getInnerIgnoreCondition(currentPattern);
 			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
 
 			final State<T> loopingState = createNormalState();
 			loopingState.addProceed(sinkState, trueFunction);
-			loopingState.addTake(filterFunction);
+			loopingState.addTake(currentCondition);
+
+			addStopStateToLooping(loopingState);
 
 			if (ignoreCondition != null) {
 				final State<T> ignoreState = createNormalState();
-				ignoreState.addTake(loopingState, filterFunction);
+				ignoreState.addTake(loopingState, currentCondition);
 				ignoreState.addIgnore(ignoreCondition);
 				loopingState.addIgnore(ignoreState, ignoreCondition);
-			}
 
+				addStopStateToLooping(ignoreState);
+			}
 			return loopingState;
 		}
 
 		/**
-		 * Creates a state with {@link State.StateType#Normal} and adds it to the collection of created states.
-		 * Should be used instead of instantiating with new operator.
+		 * Patterns with quantifiers AT_LEAST_ONE_* are created as a pair of states: a singleton state and
+		 * looping state. This method creates the first of the two.
 		 *
-		 * @return the created state
+		 * @param sinkState the state the newly created state should point to, it should be a looping state
+		 * @return the newly created state
 		 */
-		private State<T> createNormalState() {
-			final State<T> state = new State<>(currentPattern.getName(), State.StateType.Normal);
-			states.add(state);
-			return state;
+		@SuppressWarnings("unchecked")
+		private State<T> createInitMandatoryStateOfOneOrMore(final State<T> sinkState) {
+			final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
+
+			final State<T> firstState = createNormalState();
+			firstState.addTake(sinkState, currentCondition);
+
+			final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
+			if (ignoreCondition != null) {
+				firstState.addIgnore(ignoreCondition);
+			}
+			return firstState;
+		}
+
+		/**
+		 * Creates a pair of states that enables relaxed strictness before a zeroOrMore looping state.
+		 *
+		 * @param loopingState the first state of zeroOrMore complex state
+		 * @param lastSink     the state that the looping one points to
+		 * @return the newly created state
+		 */
+		@SuppressWarnings("unchecked")
+		private State<T> createInitOptionalStateOfZeroOrMore(final State<T> loopingState, final State<T> lastSink) {
+			final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
+
+			final State<T> firstState = createNormalState();
+			firstState.addProceed(lastSink, BooleanConditions.<T>trueFunction());
+			firstState.addTake(loopingState, currentCondition);
+
+			final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
+			if (ignoreFunction != null) {
+				final State<T> firstStateWithoutProceed = createNormalState();
+				firstState.addIgnore(firstStateWithoutProceed, ignoreFunction);
+				firstStateWithoutProceed.addIgnore(ignoreFunction);
+				firstStateWithoutProceed.addTake(loopingState, currentCondition);
+
+				addStopStates(firstStateWithoutProceed);
+			}
+			return firstState;
 		}
 
 		/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index cd62c0d..b6374cd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -218,7 +218,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 		}
 	}
 
-	long count = 0;
 	@Override
 	public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index b100bc5..3cf25ef 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -210,7 +210,18 @@ public class Pattern<T, F extends T> {
 	 * @return A new pattern which is appended to this one
 	 */
 	public Pattern<T, T> next(final String name) {
-		return new Pattern<T, T>(name, this, ConsumingStrategy.STRICT);
+		return new Pattern<>(name, this, ConsumingStrategy.STRICT);
+	}
+
+	/**
+	 * Appends a new pattern to the existing one. The new pattern enforces that there is no event matching this pattern
+	 * right after the preceding matched event.
+	 *
+	 * @param name Name of the new pattern
+	 * @return A new pattern which is appended to this one
+	 */
+	public Pattern<T, T> notNext(final String name) {
+		return new Pattern<>(name, this, ConsumingStrategy.NOT_NEXT);
 	}
 
 	/**
@@ -226,6 +237,26 @@ public class Pattern<T, F extends T> {
 	}
 
 	/**
+	 * Appends a new pattern to the existing one. The new pattern enforces that there is no event matching this pattern
+	 * between the preceding pattern and succeeding this one.
+	 *
+	 * <p><b>NOTE:</b> There has to be other pattern after this one.
+	 *
+	 * @param name Name of the new pattern
+	 * @return A new pattern which is appended to this one
+	 */
+	public Pattern<T, T> notFollowedBy(final String name) {
+		if (quantifier.hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
+			throw new MalformedPatternException(
+				"Specifying a pattern with an optional path to NOT condition is not supported yet. " +
+				"You can simulate such pattern with two independent patterns, one with and the other without " +
+				"the optional part.");
+		}
+
+		return new Pattern<>(name, this, ConsumingStrategy.NOT_FOLLOW);
+	}
+
+	/**
 	 * Appends a new pattern to the existing one. The new pattern enforces non-strict
 	 * temporal contiguity. This means that a matching event of this pattern and the
 	 * preceding matching event might be interleaved with other events which are ignored.
@@ -263,6 +294,7 @@ public class Pattern<T, F extends T> {
 	 * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
 	 */
 	public Pattern<T, F> oneOrMore() {
+		checkIfNoNotPattern();
 		checkIfQuantifierApplied();
 		this.quantifier = Quantifier.ONE_OR_MORE(quantifier.getConsumingStrategy());
 		return this;
@@ -277,6 +309,7 @@ public class Pattern<T, F extends T> {
 	 * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
 	 */
 	public Pattern<T, F> times(int times) {
+		checkIfNoNotPattern();
 		checkIfQuantifierApplied();
 		Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0.");
 		this.quantifier = Quantifier.TIMES(quantifier.getConsumingStrategy());
@@ -341,6 +374,13 @@ public class Pattern<T, F extends T> {
 		return this;
 	}
 
+	private void checkIfNoNotPattern() {
+		if (quantifier.getConsumingStrategy() == ConsumingStrategy.NOT_FOLLOW ||
+				quantifier.getConsumingStrategy() == ConsumingStrategy.NOT_NEXT) {
+			throw new MalformedPatternException("Option not applicable to NOT pattern");
+		}
+	}
+
 	private void checkIfQuantifierApplied() {
 		if (!quantifier.hasProperty(Quantifier.QuantifierProperty.SINGLE)) {
 			throw new MalformedPatternException("Already applied quantifier to this Pattern. " +

http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index b0f882c..382c3ba 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -83,9 +83,10 @@ public class Quantifier {
 	}
 
 	public void optional() {
-		if (hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
-			throw new MalformedPatternException("Optional already applied!");
-		}
+		checkPattern(!hasProperty(QuantifierProperty.OPTIONAL), "Optional already applied!");
+		checkPattern(!(consumingStrategy == ConsumingStrategy.NOT_NEXT ||
+					consumingStrategy == ConsumingStrategy.NOT_FOLLOW), "NOT pattern cannot be optional");
+
 		properties.add(Quantifier.QuantifierProperty.OPTIONAL);
 	}
 
@@ -120,7 +121,10 @@ public class Quantifier {
 	public enum ConsumingStrategy {
 		STRICT,
 		SKIP_TILL_NEXT,
-		SKIP_TILL_ANY
+		SKIP_TILL_ANY,
+
+		NOT_FOLLOW,
+		NOT_NEXT
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index fe31564..ab6ff82 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -393,8 +393,7 @@ public class NFAITCase extends TestLogger {
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), true);
 
 		for (StreamRecord<Event> event: events) {
-			Tuple2<Collection<Map<String, Event>>, Collection<Tuple2<Map<String, Event>, Long>>> patterns =
-				nfa.process(event.getValue(), event.getTimestamp());
+			final Tuple2<Collection<Map<String, Event>>, Collection<Tuple2<Map<String, Event>, Long>>> patterns = nfa.process(event.getValue(), event.getTimestamp());
 
 			Collection<Map<String, Event>> matchedPatterns = patterns.f0;
 			Collection<Tuple2<Map<String, Event>, Long>> timeoutPatterns = patterns.f1;
@@ -786,14 +785,16 @@ public class NFAITCase extends TestLogger {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).oneOrMore().allowCombinations().optional().followedByAny("middle-second").where(new SimpleCondition<Event>() {
+		}).oneOrMore().allowCombinations().optional()
+			.followedBy("middle-second").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("d");
 			}
-		}).oneOrMore().allowCombinations().optional().followedBy("end").where(new SimpleCondition<Event>() {
+		}).oneOrMore().allowCombinations().optional()
+			.followedBy("end").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -818,13 +819,11 @@ public class NFAITCase extends TestLogger {
 			}
 		}
 
-		assertEquals(8, allPatterns.size());
+		assertEquals(6, allPatterns.size());
 		assertEquals(Sets.newHashSet(
 			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end),
-			Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end),
 			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end),
 			Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end),
-			Sets.newHashSet(startEvent, middleEvent3, end),
 			Sets.newHashSet(startEvent, middleEvent2, end),
 			Sets.newHashSet(startEvent, middleEvent1, end),
 			Sets.newHashSet(startEvent, end)
@@ -3147,7 +3146,1048 @@ public class NFAITCase extends TestLogger {
 	}
 
 
-	/////////////////////////////////////////    Utility      ////////////////////////////////
+	/////////////////////////////////////////       Not pattern    /////////////////////////////////////////////////
+
+	@Test
+	public void testNotNext() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notNext("notPattern").where(new SimpleCondition<Event>() {
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, c1, d),
+			Lists.newArrayList(a1, c2, d)
+		));
+	}
+
+	@Test
+	public void testNotNextNoMatches() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(b1, 2));
+		inputEvents.add(new StreamRecord<>(c1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notNext("notPattern").where(new SimpleCondition<Event>() {
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		assertEquals(0, matches.size());
+	}
+
+	@Test
+	public void testNotNextNoMatchesAtTheEnd() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+		Event b1 = new Event(42, "b", 3.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(c2, 3));
+		inputEvents.add(new StreamRecord<>(d, 4));
+		inputEvents.add(new StreamRecord<>(b1, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		}).notNext("notPattern").where(new SimpleCondition<Event>() {
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		assertEquals(0, matches.size());
+	}
+
+	@Test
+	public void testNotFollowedBy() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches,Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, c1, d)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeOptional() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches,Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, c1, d)
+		));
+	}
+
+	@Test
+	public void testTimesWithNotFollowedBy() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event b1 = new Event(41, "b", 2.0);
+		Event c = new Event(42, "c", 3.0);
+		Event b2 = new Event(43, "b", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(b1, 2));
+		inputEvents.add(new StreamRecord<>(c, 3));
+		inputEvents.add(new StreamRecord<>(b2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches,Lists.<List<Event>>newArrayList());
+	}
+
+	@Test
+	public void testIgnoreStateOfTimesWithNotFollowedBy() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event e = new Event(41, "e", 2.0);
+		Event c1 = new Event(42, "c", 3.0);
+		Event b1 = new Event(43, "b", 4.0);
+		Event c2 = new Event(44, "c", 5.0);
+		Event d1 = new Event(45, "d", 6.0);
+		Event d2 = new Event(46, "d", 7.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(d1, 2));
+		inputEvents.add(new StreamRecord<>(e, 1));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d2, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, d1)
+		));
+	}
+
+	@Test
+	public void testTimesWithNotFollowedByAfter() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event e = new Event(41, "e", 2.0);
+		Event c1 = new Event(42, "c", 3.0);
+		Event b1 = new Event(43, "b", 4.0);
+		Event b2 = new Event(44, "b", 5.0);
+		Event d1 = new Event(46, "d", 7.0);
+		Event d2 = new Event(47, "d", 8.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(d1, 2));
+		inputEvents.add(new StreamRecord<>(e, 1));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(b2, 3));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(d2, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList());
+	}
+
+	@Test
+	public void testNotFollowedByBeforeOptionalAtTheEnd() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c2 = new Event(43, "c", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).optional();
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches,Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, c1),
+			Lists.newArrayList(a1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeOptionalTimes() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches,Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, c1, c2, d)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByWithBranchingAtStart() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event a2 = new Event(41, "a", 4.0);
+		Event c2 = new Event(43, "c", 5.0);
+		Event d = new Event(43, "d", 6.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(b1, 2));
+		inputEvents.add(new StreamRecord<>(c1, 3));
+		inputEvents.add(new StreamRecord<>(a2, 4));
+		inputEvents.add(new StreamRecord<>(c2, 5));
+		inputEvents.add(new StreamRecord<>(d, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a2, c2, d)
+		));
+	}
+
+	private static class NotFollowByData {
+		static final Event a1 = new Event(40, "a", 1.0);
+		static final Event b1 = new Event(41, "b", 2.0);
+		static final Event b2 = new Event(42, "b", 3.0);
+		static final Event b3 = new Event(42, "b", 4.0);
+		static final Event c1 = new Event(43, "c", 5.0);
+		static final Event b4 = new Event(42, "b", 6.0);
+		static final Event b5 = new Event(42, "b", 7.0);
+		static final Event b6 = new Event(42, "b", 8.0);
+		static final Event d1 = new Event(43, "d", 9.0);
+
+		private NotFollowByData() {
+		}
+	}
+
+	@Test
+	public void testNotNextAfterZeroOrMoreSkipTillNext() {
+		final List<List<Event>> matches = testNotNextAfterZeroOrMore(false);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.d1)
+		));
+	}
+
+	@Test
+	public void testNotNextAfterZeroOrMoreSkipTillAny() {
+		final List<List<Event>> matches = testNotNextAfterZeroOrMore(true);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b2, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.d1)
+		));
+	}
+
+	private List<List<Event>> testNotNextAfterZeroOrMore(boolean allMatches) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		int i = 0;
+		inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b2, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i++));
+
+		Pattern<Event, ?> pattern = Pattern
+			.<Event>begin("a").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("a");
+				}
+			});
+
+		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*")).where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).oneOrMore().optional()
+			.notNext("not c").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("c");
+				}
+			})
+			.followedBy("d").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("d");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+
+	@Test
+	public void testNotNextAfterOneOrMoreSkipTillNext() {
+		final List<List<Event>> matches = testNotNextAfterOneOrMore(false);
+		assertEquals(0, matches.size());
+	}
+
+	@Test
+	public void testNotNextAfterOneOrMoreSkipTillAny() {
+		final List<List<Event>> matches = testNotNextAfterOneOrMore(true);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b2, NotFollowByData.d1)
+		));
+	}
+
+	private List<List<Event>> testNotNextAfterOneOrMore(boolean allMatches) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		int i = 0;
+		inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b2, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i++));
+
+		Pattern<Event, ?> pattern = Pattern
+			.<Event>begin("a").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("a");
+				}
+			});
+
+		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*")).where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).oneOrMore()
+			.notNext("not c").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("c");
+				}
+			})
+			.followedBy("d").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("d");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+
+	@Test
+	public void testNotFollowedByNextAfterOneOrMoreEager() {
+		final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(true, false);
+		assertEquals(0, matches.size());
+	}
+
+	@Test
+	public void testNotFollowedByAnyAfterOneOrMoreEager() {
+		final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(true, true);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b5, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b6, NotFollowByData.d1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByNextAfterOneOrMoreCombinations() {
+		final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(false, false);
+		assertEquals(0, matches.size());
+	}
+
+	@Test
+	public void testNotFollowedByAnyAfterOneOrMoreCombinations() {
+		final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(false, true);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b5, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b6, NotFollowByData.d1)
+		));
+	}
+
+	private List<List<Event>> testNotFollowedByAfterOneOrMore(boolean eager, boolean allMatches) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		int i = 0;
+		inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b2, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b3, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b4, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b5, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b6, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i));
+
+		Pattern<Event, ?> pattern = Pattern
+			.<Event>begin("a").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("a");
+				}
+			});
+
+		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
+			.where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("b");
+				}
+			});
+
+		pattern = (eager ? pattern.oneOrMore() : pattern.oneOrMore().allowCombinations())
+			.notFollowedBy("not c").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("c");
+				}
+			})
+			.followedBy("d").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("d");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+
+	@Test
+	public void testNotFollowedByAnyBeforeOneOrMoreEager() {
+		final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(true, true);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByAnyBeforeOneOrMoreCombinations() {
+		final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(false, true);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeOneOrMoreEager() {
+		final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(true, false);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeOneOrMoreCombinations() {
+		final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(false, false);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
+		));
+	}
+
+	private List<List<Event>> testNotFollowedByBeforeOneOrMore(boolean eager, boolean allMatches) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		int i = 0;
+		inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b4, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b5, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b6, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i));
+
+		Pattern<Event, ?> pattern = Pattern
+			.<Event>begin("a").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("a");
+				}
+			})
+			.notFollowedBy("not c").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("c");
+				}
+			});
+
+		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
+			.where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("b");
+				}
+			}).oneOrMore();
+
+		pattern = (eager ? pattern : pattern.allowCombinations())
+			.followedBy("d").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("d");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+
+	@Test
+	public void testNotFollowedByBeforeZeroOrMoreEagerSkipTillNext() {
+		final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(true, false);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeZeroOrMoreCombinationsSkipTillNext() {
+		final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(false, false);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b6, NotFollowByData.d1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeZeroOrMoreEagerSkipTillAny() {
+		final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(true, true);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeZeroOrMoreCombinationsSkipTillAny() {
+		final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(false, true);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.d1),
+			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b6, NotFollowByData.d1)
+		));
+	}
+
+	private List<List<Event>> testNotFollowedByBeforeZeroOrMore(boolean eager, boolean allMatches) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		int i = 0;
+		inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b4, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b5, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.b6, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i));
+
+		Pattern<Event, ?> pattern = Pattern
+			.<Event>begin("a").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("a");
+				}
+			})
+			.notFollowedBy("not c").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("c");
+				}
+			});
+
+		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
+			.where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("b");
+				}
+			}).oneOrMore().optional();
+
+		pattern = (eager ? pattern : pattern.allowCombinations())
+			.followedBy("d").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("d");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+
+	/////////////////////////////////////////       Utility        /////////////////////////////////////////////////
+
 	private List<List<Event>> feedNFA(List<StreamRecord<Event>> inputEvents, NFA<Event> nfa) {
 		List<List<Event>> resultingPatterns = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index ced9efe..90a6321 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -87,6 +87,21 @@ public class NFACompilerTest extends TestLogger {
 		NFACompiler.compile(invalidPattern, Event.createTypeSerializer(), false);
 	}
 
+	@Test
+	public void testNFACompilerPatternEndsWithNotFollowedBy() {
+
+		// adjust the rule
+		expectedException.expect(MalformedPatternException.class);
+		expectedException.expectMessage("NotFollowedBy is not supported as a last part of a Pattern!");
+
+		Pattern<Event, ?> invalidPattern = Pattern.<Event>begin("start").where(new TestFilter())
+			.followedBy("middle").where(new TestFilter())
+			.notFollowedBy("end").where(new TestFilter());
+
+		// here we must have an exception because of the two "start" patterns with the same name.
+		NFACompiler.compile(invalidPattern, Event.createTypeSerializer(), false);
+	}
+
 	/**
 	 * A filter implementation to test invalid pattern specification with
 	 * duplicate pattern names. Check {@link #testNFACompilerUniquePatternName()}.
@@ -149,6 +164,26 @@ public class NFACompilerTest extends TestLogger {
 		assertEquals(0, endingState.getStateTransitions().size());
 	}
 
+	@Test
+	public void testNoUnnecessaryStateCopiesCreated() {
+		final Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(startFilter)
+			.notFollowedBy("not").where(startFilter)
+			.followedBy("oneOrMore").where(startFilter).oneOrMore()
+			.followedBy("end").where(endFilter);
+
+		final NFACompiler.NFAFactoryCompiler<Event> nfaFactoryCompiler = new NFACompiler.NFAFactoryCompiler<>(pattern);
+		nfaFactoryCompiler.compileFactory();
+
+		int endStateCount = 0;
+		for (State<Event> state : nfaFactoryCompiler.getStates()) {
+			if (state.getName().equals("end")) {
+				endStateCount++;
+			}
+		}
+
+		assertEquals(1, endStateCount);
+	}
+
 	private <T> Set<Tuple2<String, StateTransitionAction>> unfoldTransitions(final State<T> state) {
 		final Set<Tuple2<String, StateTransitionAction>> transitions = new HashSet<>();
 		for (StateTransition<T> transition : state.getStateTransitions()) {


Mime
View raw message