flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/3] flink git commit: [FLINK-6137] Activate strict checkstyle for flink-cep
Date Sun, 28 May 2017 08:51:57 GMT
Repository: flink
Updated Branches:
  refs/heads/master 4f50dc4df -> c9e574bf3


http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
new file mode 100644
index 0000000..7bf0767
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.primitives.Doubles;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base method for IT tests of {@link NFA}. It provides utility methods.
+ */
+public class NFATestUtilities {
+
+	public static List<List<Event>> feedNFA(List<StreamRecord<Event>> inputEvents, NFA<Event> nfa) {
+		List<List<Event>> resultingPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, List<Event>>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, List<Event>> p: patterns) {
+				List<Event> res = new ArrayList<>();
+				for (List<Event> le: p.values()) {
+					res.addAll(le);
+				}
+				resultingPatterns.add(res);
+			}
+		}
+		return resultingPatterns;
+	}
+
+	public static void compareMaps(List<List<Event>> actual, List<List<Event>> expected) {
+		Assert.assertEquals(expected.size(), actual.size());
+
+		for (List<Event> p: actual) {
+			Collections.sort(p, new EventComparator());
+		}
+
+		for (List<Event> p: expected) {
+			Collections.sort(p, new EventComparator());
+		}
+
+		Collections.sort(actual, new ListEventComparator());
+		Collections.sort(expected, new ListEventComparator());
+		Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+	}
+
+	private static class ListEventComparator implements Comparator<List<Event>> {
+
+		@Override
+		public int compare(List<Event> o1, List<Event> o2) {
+			int sizeComp = Integer.compare(o1.size(), o2.size());
+			if (sizeComp == 0) {
+				EventComparator comp = new EventComparator();
+				for (int i = 0; i < o1.size(); i++) {
+					int eventComp = comp.compare(o1.get(i), o2.get(i));
+					if (eventComp != 0) {
+						return eventComp;
+					}
+				}
+				return 0;
+			} else {
+				return sizeComp;
+			}
+		}
+	}
+
+	private static class EventComparator implements Comparator<Event> {
+
+		@Override
+		public int compare(Event o1, Event o2) {
+			int nameComp = o1.getName().compareTo(o2.getName());
+			int priceComp = Doubles.compare(o1.getPrice(), o2.getPrice());
+			int idComp = Integer.compare(o1.getId(), o2.getId());
+			if (nameComp == 0) {
+				if (priceComp == 0) {
+					return idComp;
+				} else {
+					return priceComp;
+				}
+			} else {
+				return nameComp;
+			}
+		}
+	}
+
+	private NFATestUtilities() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
new file mode 100644
index 0000000..3b95eb4
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
@@ -0,0 +1,1036 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link Pattern#notFollowedBy(String)} and {@link Pattern#notNext(String)}.
+ */
+@SuppressWarnings("unchecked")
+public class NotPatternITCase extends TestLogger {
+
+	@Test
+	public void testNotNext() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5167288560432018992L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notNext("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 2242479288129905510L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 1404509325548220892L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -8907427230007830915L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, c1, d),
+			Lists.newArrayList(a1, c2, d)
+		));
+	}
+
+	@Test
+	public void testNotNextNoMatches() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(b1, 2));
+		inputEvents.add(new StreamRecord<>(c1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -339500190577666439L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notNext("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -6913980632538046451L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 3332196998905139891L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 2086563479959018387L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		assertEquals(0, matches.size());
+	}
+
+	@Test
+	public void testNotNextNoMatchesAtTheEnd() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+		Event b1 = new Event(42, "b", 3.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(c2, 3));
+		inputEvents.add(new StreamRecord<>(d, 4));
+		inputEvents.add(new StreamRecord<>(b1, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 1672995058886176627L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 6003621617520261554L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 887700237024758417L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		}).notNext("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5239529076086933032L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		assertEquals(0, matches.size());
+	}
+
+	@Test
+	public void testNotFollowedBy() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -2641662468313191976L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -3632144132379494778L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 3818766882138348167L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 2033204730795451288L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, c1, d)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeOptional() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -2454396370205097543L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 2749547391611263290L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -4989511337298217255L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -8466223836652936608L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, c1, d)
+		));
+	}
+
+	@Test
+	public void testTimesWithNotFollowedBy() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event b1 = new Event(41, "b", 2.0);
+		Event c = new Event(42, "c", 3.0);
+		Event b2 = new Event(43, "b", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(b1, 2));
+		inputEvents.add(new StreamRecord<>(c, 3));
+		inputEvents.add(new StreamRecord<>(b2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -2568839911852184515L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -3632232424064269636L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 3685596793523534611L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 1960758663575587243L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList());
+	}
+
+	@Test
+	public void testIgnoreStateOfTimesWithNotFollowedBy() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event e = new Event(41, "e", 2.0);
+		Event c1 = new Event(42, "c", 3.0);
+		Event b1 = new Event(43, "b", 4.0);
+		Event c2 = new Event(44, "c", 5.0);
+		Event d1 = new Event(45, "d", 6.0);
+		Event d2 = new Event(46, "d", 7.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(d1, 2));
+		inputEvents.add(new StreamRecord<>(e, 1));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d2, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 2814850350025111940L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 4988756153568853834L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -225909103322018778L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -924294627956373696L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, d1)
+		));
+	}
+
+	@Test
+	public void testTimesWithNotFollowedByAfter() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event e = new Event(41, "e", 2.0);
+		Event c1 = new Event(42, "c", 3.0);
+		Event b1 = new Event(43, "b", 4.0);
+		Event b2 = new Event(44, "b", 5.0);
+		Event d1 = new Event(46, "d", 7.0);
+		Event d2 = new Event(47, "d", 8.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(d1, 2));
+		inputEvents.add(new StreamRecord<>(e, 1));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(b2, 3));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(d2, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 6193105689601702341L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5195859580923169111L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 4973027956103783831L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 2724622546678984894L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList());
+	}
+
+	@Test
+	public void testNotFollowedByBeforeOptionalAtTheEnd() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c2 = new Event(43, "c", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -4289351792573443294L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -4989574608417523507L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -5940131818629290579L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).optional();
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, c1),
+			Lists.newArrayList(a1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeOptionalTimes() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -7885381452276160322L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 3471511260235826653L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 9073793782452363833L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 7972902718259767076L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, c1, c2, d)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByWithBranchingAtStart() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event a2 = new Event(41, "a", 4.0);
+		Event c2 = new Event(43, "c", 5.0);
+		Event d = new Event(43, "d", 6.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(b1, 2));
+		inputEvents.add(new StreamRecord<>(c1, 3));
+		inputEvents.add(new StreamRecord<>(a2, 4));
+		inputEvents.add(new StreamRecord<>(c2, 5));
+		inputEvents.add(new StreamRecord<>(d, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -7866220136345465444L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 4957837489028234932L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5569569968862808007L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -8579678167937416269L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a2, c2, d)
+		));
+	}
+
+	private static class NotFollowByData {
+		static final Event A_1 = new Event(40, "a", 1.0);
+		static final Event B_1 = new Event(41, "b", 2.0);
+		static final Event B_2 = new Event(42, "b", 3.0);
+		static final Event B_3 = new Event(42, "b", 4.0);
+		static final Event C_1 = new Event(43, "c", 5.0);
+		static final Event B_4 = new Event(42, "b", 6.0);
+		static final Event B_5 = new Event(42, "b", 7.0);
+		static final Event B_6 = new Event(42, "b", 8.0);
+		static final Event D_1 = new Event(43, "d", 9.0);
+
+		private NotFollowByData() {
+		}
+	}
+
+	@Test
+	public void testNotNextAfterOneOrMoreSkipTillNext() {
+		final List<List<Event>> matches = testNotNextAfterOneOrMore(false);
+		assertEquals(0, matches.size());
+	}
+
+	@Test
+	public void testNotNextAfterOneOrMoreSkipTillAny() {
+		final List<List<Event>> matches = testNotNextAfterOneOrMore(true);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_2, NotFollowByData.D_1)
+		));
+	}
+
+	private List<List<Event>> testNotNextAfterOneOrMore(boolean allMatches) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		int i = 0;
+		inputEvents.add(new StreamRecord<>(NotFollowByData.A_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.C_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_2, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.D_1, i++));
+
+		Pattern<Event, ?> pattern = Pattern
+			.<Event>begin("a").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("a");
+				}
+			});
+
+		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*")).where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).oneOrMore()
+			.notNext("not c").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("c");
+				}
+			})
+			.followedBy("d").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("d");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+
+	@Test
+	public void testNotFollowedByNextAfterOneOrMoreEager() {
+		final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(true, false);
+		assertEquals(0, matches.size());
+	}
+
+	@Test
+	public void testNotFollowedByAnyAfterOneOrMoreEager() {
+		final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(true, true);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_6, NotFollowByData.D_1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByNextAfterOneOrMoreCombinations() {
+		final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(false, false);
+		assertEquals(0, matches.size());
+	}
+
+	@Test
+	public void testNotFollowedByAnyAfterOneOrMoreCombinations() {
+		final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(false, true);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_6, NotFollowByData.D_1)
+		));
+	}
+
+	private List<List<Event>> testNotFollowedByAfterOneOrMore(boolean eager, boolean allMatches) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		int i = 0;
+		inputEvents.add(new StreamRecord<>(NotFollowByData.A_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_2, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_3, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.C_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_4, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_5, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_6, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.D_1, i));
+
+		Pattern<Event, ?> pattern = Pattern
+			.<Event>begin("a").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("a");
+				}
+			});
+
+		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
+			.where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("b");
+				}
+			});
+
+		pattern = (eager ? pattern.oneOrMore() : pattern.oneOrMore().allowCombinations())
+			.notFollowedBy("not c").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("c");
+				}
+			})
+			.followedBy("d").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("d");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+
+	@Test
+	public void testNotFollowedByAnyBeforeOneOrMoreEager() {
+		final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(true, true);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByAnyBeforeOneOrMoreCombinations() {
+		final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(false, true);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeOneOrMoreEager() {
+		final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(true, false);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeOneOrMoreCombinations() {
+		final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(false, false);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+		));
+	}
+
+	private List<List<Event>> testNotFollowedByBeforeOneOrMore(boolean eager, boolean allMatches) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		int i = 0;
+		inputEvents.add(new StreamRecord<>(NotFollowByData.A_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.C_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_4, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_5, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_6, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.D_1, i));
+
+		Pattern<Event, ?> pattern = Pattern
+			.<Event>begin("a").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("a");
+				}
+			})
+			.notFollowedBy("not c").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("c");
+				}
+			});
+
+		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
+			.where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("b");
+				}
+			}).oneOrMore();
+
+		pattern = (eager ? pattern : pattern.allowCombinations())
+			.followedBy("d").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("d");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+
+	@Test
+	public void testNotFollowedByBeforeZeroOrMoreEagerSkipTillNext() {
+		final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(true, false);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeZeroOrMoreCombinationsSkipTillNext() {
+		final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(false, false);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_6, NotFollowByData.D_1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeZeroOrMoreEagerSkipTillAny() {
+		final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(true, true);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeZeroOrMoreCombinationsSkipTillAny() {
+		final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(false, true);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_6, NotFollowByData.D_1)
+		));
+	}
+
+	private List<List<Event>> testNotFollowedByBeforeZeroOrMore(boolean eager, boolean allMatches) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		int i = 0;
+		inputEvents.add(new StreamRecord<>(NotFollowByData.A_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.C_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_4, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_5, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_6, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.D_1, i));
+
+		Pattern<Event, ?> pattern = Pattern
+			.<Event>begin("a").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("a");
+				}
+			})
+			.notFollowedBy("not c").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("c");
+				}
+			});
+
+		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
+			.where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("b");
+				}
+			}).oneOrMore().optional();
+
+		pattern = (eager ? pattern : pattern.allowCombinations())
+			.followedBy("d").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("d");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
new file mode 100644
index 0000000..d378a74
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * Tests for handling Events that are equal in case of {@link Object#equals(Object)} and have same timestamps.
+ */
+@SuppressWarnings("unchecked")
+public class SameElementITCase extends TestLogger {
+
+	@Test
+	public void testEagerZeroOrMoreSameElement() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(end1, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
+			Lists.newArrayList(startEvent, middleEvent1, end1),
+			Lists.newArrayList(startEvent, end1)
+		));
+	}
+
+	@Test
+	public void testZeroOrMoreSameElement() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent1a = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event middleEvent3a = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(middleEvent3a, 6));
+		inputEvents.add(new StreamRecord<>(end1, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().optional().allowCombinations().followedByAny("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3, middleEvent3a, end1),
+
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3, middleEvent3a, end1),
+
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent2, middleEvent3, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent3, middleEvent3a, end1),
+
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent2, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent3, middleEvent3a, end1),
+
+			Lists.newArrayList(startEvent, middleEvent1, end1),
+			Lists.newArrayList(startEvent, middleEvent1a, end1),
+			Lists.newArrayList(startEvent, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent3a, end1),
+
+			Lists.newArrayList(startEvent, end1)
+		));
+	}
+
+	@Test
+	public void testSimplePatternWSameElement() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(end1, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, end1),
+			Lists.newArrayList(startEvent, middleEvent1, end1)
+		));
+	}
+
+	@Test
+	public void testIterativeConditionWSameElement() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent1a = new Event(41, "a", 2.0);
+		Event middleEvent1b = new Event(41, "a", 2.0);
+		final Event end = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent1b, 3));
+		inputEvents.add(new StreamRecord<>(end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().optional().allowCombinations().followedBy("end").where(new IterativeCondition<Event>() {
+
+			private static final long serialVersionUID = -5566639743229703237L;
+
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+				double sum = 0.0;
+				for (Event event: ctx.getEventsForPattern("middle")) {
+					sum += event.getPrice();
+				}
+				return Double.compare(sum, 4.0) == 0;
+			}
+
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, end),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent1b),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent1b, end)
+		));
+	}
+
+	@Test
+	public void testEndWLoopingWSameElement() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent1a = new Event(41, "a", 2.0);
+		Event middleEvent1b = new Event(41, "a", 2.0);
+		final Event end = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent1b, 3));
+		inputEvents.add(new StreamRecord<>(end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().optional();
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent),
+			Lists.newArrayList(startEvent, middleEvent1),
+			Lists.newArrayList(startEvent, middleEvent1a),
+			Lists.newArrayList(startEvent, middleEvent1b),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent1b),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent1b)
+		));
+	}
+
+	@Test
+	public void testRepeatingPatternWSameElement() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middle1Event1 = new Event(40, "a", 2.0);
+		Event middle1Event2 = new Event(40, "a", 3.0);
+		Event middle1Event3 = new Event(40, "a", 4.0);
+		Event middle2Event1 = new Event(40, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middle1Event1, 3));
+		inputEvents.add(new StreamRecord<>(middle1Event1, 3));
+		inputEvents.add(new StreamRecord<>(middle1Event2, 3));
+		inputEvents.add(new StreamRecord<>(new Event(40, "d", 6.0), 5));
+		inputEvents.add(new StreamRecord<>(middle2Event1, 6));
+		inputEvents.add(new StreamRecord<>(middle1Event3, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().optional().followedBy("middle2").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middle1Event1),
+
+			Lists.newArrayList(startEvent, middle1Event1, middle1Event1),
+			Lists.newArrayList(startEvent, middle2Event1, middle1Event3),
+
+			Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2),
+			Lists.newArrayList(startEvent, middle1Event1, middle2Event1, middle1Event3),
+
+			Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle1Event3),
+			Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle2Event1, middle1Event3),
+
+			Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle2Event1, middle1Event3)
+		));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index bd828b6..44033c1 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -18,20 +18,19 @@
 
 package org.apache.flink.cep.nfa;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.cep.Event;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.Collection;
 import java.util.Collections;
 
@@ -39,6 +38,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link SharedBuffer}.
+ */
 public class SharedBufferTest extends TestLogger {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index 26b8ce9..cd12071 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.cep.pattern.MalformedPatternException;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -45,6 +46,9 @@ import static com.google.common.collect.Sets.newHashSet;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link NFACompiler}.
+ */
 public class NFACompilerTest extends TestLogger {
 
 	private static final SimpleCondition<Event> startFilter = new SimpleCondition<Event>() {
@@ -116,7 +120,7 @@ public class NFACompilerTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that the NFACompiler generates the correct NFA from a given Pattern
+	 * Tests that the NFACompiler generates the correct NFA from a given Pattern.
 	 */
 	@Test
 	public void testNFACompilerWithSimplePattern() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
index 0345192..f5a909b 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -33,6 +34,7 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
+
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -463,7 +465,6 @@ public class CEPFrom12MigrationTest {
 		}
 	}
 
-
 	@Test
 	public void testSinglePatternAfterMigration() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index c92f772..69ba42f 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
 import org.junit.Test;
 
 import java.net.URL;
@@ -45,6 +46,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for migration from 1.1.x to 1.3.x.
+ */
 public class CEPMigration11to13Test {
 
 	private static String getResourceFilename(String filename) {
@@ -198,7 +202,7 @@ public class CEPMigration11to13Test {
 
 		final Event startEvent = new Event(42, "start", 1.0);
 		final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
-		final Event endEvent=  new Event(42, "end", 1.0);
+		final Event endEvent = new Event(42, "end", 1.0);
 
 		// uncomment these lines for regenerating the snapshot on Flink 1.1
 		/*

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 38ad0f1..d83c191 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.cep.operator;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -42,13 +41,13 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import static org.junit.Assert.*;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -57,6 +56,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link KeyedCEPPatternOperator} and {@link TimeoutKeyedCEPPatternOperator}.
+ */
 public class CEPOperatorTest extends TestLogger {
 
 	@Rule
@@ -269,7 +274,7 @@ public class CEPOperatorTest extends TestLogger {
 			assertTrue(resultObject instanceof StreamRecord);
 
 			StreamRecord<Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>> streamRecord =
-					(StreamRecord<Either<Tuple2<Map<String,List<Event>>,Long>,Map<String,List<Event>>>>) resultObject;
+					(StreamRecord<Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>>) resultObject;
 
 			assertTrue(streamRecord.getValue() instanceof Either.Left);
 
@@ -299,8 +304,8 @@ public class CEPOperatorTest extends TestLogger {
 		SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
 		SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0);
 		SubEvent middleEvent3 = new SubEvent(42, "foo3", 1.0, 10.0);
-		Event endEvent1 =  new Event(42, "end", 1.0);
-		Event endEvent2 =  new Event(42, "end", 2.0);
+		Event endEvent1 = new Event(42, "end", 1.0);
+		Event endEvent2 = new Event(42, "end", 2.0);
 
 		Event startEventK2 = new Event(43, "start", 1.0);
 
@@ -493,8 +498,8 @@ public class CEPOperatorTest extends TestLogger {
 		SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
 		SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0);
 		SubEvent middleEvent3 = new SubEvent(42, "foo3", 1.0, 10.0);
-		Event endEvent1 =  new Event(42, "end", 1.0);
-		Event endEvent2 =  new Event(42, "end", 2.0);
+		Event endEvent1 = new Event(42, "end", 1.0);
+		Event endEvent2 = new Event(42, "end", 2.0);
 
 		Event startEventK2 = new Event(43, "start", 1.0);
 
@@ -747,7 +752,6 @@ public class CEPOperatorTest extends TestLogger {
 		Assert.assertArrayEquals(expected.toArray(), actual.toArray());
 	}
 
-
 	private class ListEventComparator implements Comparator<List<Event>> {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 86be09c..40514df 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
 import org.junit.Test;
 
 import java.util.List;
@@ -44,6 +45,9 @@ import java.util.Queue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for rescaling of CEP operators.
+ */
 public class CEPRescalingTest {
 
 	@Test
@@ -64,7 +68,7 @@ public class CEPRescalingTest {
 
 		Event startEvent1 = new Event(7, "start", 1.0);
 		SubEvent middleEvent1 = new SubEvent(7, "foo", 1.0, 10.0);
-		Event endEvent1=  new Event(7, "end", 1.0);
+		Event endEvent1 = new Event(7, "end", 1.0);
 
 		int keygroup = KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(startEvent1), maxParallelism);
 		assertEquals(1, keygroup);

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index b6fb484..e00384b 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -25,15 +25,23 @@ import org.apache.flink.cep.pattern.conditions.OrCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.cep.pattern.conditions.SubtypeCondition;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for constructing {@link Pattern}.
+ */
 public class PatternTest extends TestLogger {
+
 	/**
-	 * These test simply test that the pattern construction completes without failure
+	 * These test simply test that the pattern construction completes without failure.
 	 */
-
 	@Test
 	public void testStrictContiguity() {
 		Pattern<Object, ?> pattern = Pattern.begin("start").next("next").next("end");


Mime
View raw message