flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [33/50] [abbrv] flink git commit: [FLINK-6197] [cep] Add support for iterative conditions.
Date Thu, 30 Mar 2017 22:04:44 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
new file mode 100644
index 0000000..6aaa4bb
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+/**
+ * A {@link IterativeCondition condition} which combines two conditions with a logical
+ * {@code OR} and returns {@code true} if at least one is {@code true}.
+ *
+ * @param <T> Type of the element to filter
+ */
+public class OrCondition<T> extends IterativeCondition<T> {
+
+	private static final long serialVersionUID = 2554610954278485106L;
+
+	private final IterativeCondition<T> left;
+	private final IterativeCondition<T> right;
+
+	public OrCondition(final IterativeCondition<T> left, final IterativeCondition<T> right) {
+		this.left = left;
+		this.right = right;
+	}
+
+	@Override
+	public boolean filter(T value, Context<T> ctx) throws Exception {
+		return left.filter(value, ctx) || right.filter(value, ctx);
+	}
+
+	/**
+	 * @return One of the {@link IterativeCondition conditions} combined in this condition.
+	 */
+	public IterativeCondition<T> getLeft() {
+		return left;
+	}
+
+	/**
+	 * @return One of the {@link IterativeCondition conditions} combined in this condition.
+	 */
+	public IterativeCondition<T> getRight() {
+		return right;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java
new file mode 100644
index 0000000..9ca52c5
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ *  A user-defined condition that decides if an element should be accepted in the pattern or not.
+ * Accepting an element also signals a state transition for the corresponding {@link org.apache.flink.cep.nfa.NFA}.
+ *
+ * <p>Contrary to the {@link IterativeCondition}, conditions that extend this class do not have access to the
+ * previously accepted elements in the pattern. Conditions that extend this class are simple {@code filter(...)}
+ * functions that decide based on the properties of the element at hand.
+ */
+public abstract class SimpleCondition<T> extends IterativeCondition<T> implements FilterFunction<T> {
+
+	private static final long serialVersionUID = 4942618239408140245L;
+
+	@Override
+	public boolean filter(T value, Context<T> ctx) throws Exception {
+		return filter(value);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
new file mode 100644
index 0000000..91f6c21
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+/**
+ * A {@link IterativeCondition condition} which filters elements of the given type.
+ * An element is filtered out iff it is not assignable to the given subtype of {@code T}.
+ *
+ * @param <T> Type of the elements to be filtered
+ */
+public class SubtypeCondition<T> extends SimpleCondition<T> {
+	private static final long serialVersionUID = -2990017519957561355L;
+
+	/** The subtype to filter for. */
+	private final Class<? extends T> subtype;
+
+	public SubtypeCondition(final Class<? extends T> subtype) {
+		this.subtype = subtype;
+	}
+
+	@Override
+	public boolean filter(T value) throws Exception {
+		return subtype.isAssignableFrom(value.getClass());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 5887017..42117ee 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.cep;
 
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -81,7 +81,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 			new Event(8, "end", 1.0)
 		);
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -89,7 +89,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 			}
 		})
 		.followedBy("middle").subtype(SubEvent.class).where(
-				new FilterFunction<SubEvent>() {
+				new SimpleCondition<SubEvent>() {
 
 					@Override
 					public boolean filter(SubEvent value) throws Exception {
@@ -97,7 +97,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 					}
 				}
 			)
-		.followedBy("end").where(new FilterFunction<Event>() {
+		.followedBy("end").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -156,7 +156,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 			}
 		});
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -164,7 +164,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 			}
 		})
 			.followedBy("middle").subtype(SubEvent.class).where(
-				new FilterFunction<SubEvent>() {
+				new SimpleCondition<SubEvent>() {
 
 					@Override
 					public boolean filter(SubEvent value) throws Exception {
@@ -172,7 +172,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 					}
 				}
 			)
-			.followedBy("end").where(new FilterFunction<Event>() {
+			.followedBy("end").where(new SimpleCondition<Event>() {
 
 				@Override
 				public boolean filter(Event value) throws Exception {
@@ -236,19 +236,19 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 			}
 		});
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("start");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("middle");
 			}
-		}).followedBy("end").where(new FilterFunction<Event>() {
+		}).followedBy("end").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -325,19 +325,19 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 			}
 		});
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("start");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("middle");
 			}
-		}).followedBy("end").where(new FilterFunction<Event>() {
+		}).followedBy("end").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -378,7 +378,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 
 		Pattern<Tuple2<Integer, Integer>, ?> pattern =
 			Pattern.<Tuple2<Integer, Integer>>begin("start")
-				.where(new FilterFunction<Tuple2<Integer, Integer>>() {
+				.where(new SimpleCondition<Tuple2<Integer, Integer>>() {
 					@Override
 					public boolean filter(Tuple2<Integer, Integer> rec) throws Exception {
 						return rec.f1 == 1;
@@ -456,19 +456,19 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 			}
 		});
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("start");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("middle");
 			}
-		}).followedBy("end").where(new FilterFunction<Event>() {
+		}).followedBy("end").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -524,26 +524,26 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 		);
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
-			.where(new FilterFunction<Event>() {
+			.where(new SimpleCondition<Event>() {
 				@Override
 				public boolean filter(Event value) throws Exception {
 					return value.getName().equals("start");
 				}
 			})
 			.followedBy("middle")
-			.where(new FilterFunction<Event>() {
+			.where(new SimpleCondition<Event>() {
 				@Override
 				public boolean filter(Event value) throws Exception {
 					return value.getPrice() == 2.0;
 				}
 			})
-			.or(new FilterFunction<Event>() {
+			.or(new SimpleCondition<Event>() {
 				@Override
 				public boolean filter(Event value) throws Exception {
 					return value.getPrice() == 5.0;
 				}
 			})
-			.followedBy("end").where(new FilterFunction<Event>() {
+			.followedBy("end").where(new SimpleCondition<Event>() {
 
 				@Override
 				public boolean filter(Event value) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 5b05f19..197767e 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -18,20 +18,26 @@
 
 package org.apache.flink.cep.nfa;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import org.apache.flink.api.common.functions.FilterFunction;
+import com.google.common.primitives.Doubles;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -39,7 +45,6 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class NFAITCase extends TestLogger {
 
@@ -58,23 +63,21 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<Event>(new Event(43, "start", 1.0), 4));
 		inputEvents.add(new StreamRecord<Event>(endEvent, 5));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("start");
 			}
-		})
-		.followedBy("middle").subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {
-				private static final long serialVersionUID = 6215754202506583964L;
+		}).followedBy("middle").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+			private static final long serialVersionUID = 6215754202506583964L;
 
-				@Override
-				public boolean filter(SubEvent value) throws Exception {
-					return value.getVolume() > 5.0;
-				}
-			})
-		.followedBy("end").where(new FilterFunction<Event>() {
+			@Override
+			public boolean filter(SubEvent value) throws Exception {
+				return value.getVolume() > 5.0;
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 7056763917392056548L;
 
 			@Override
@@ -113,14 +116,14 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
 		inputEvents.add(new StreamRecord<>(end, 5));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).next("end").where(new FilterFunction<Event>() {
+		}).next("end").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -163,14 +166,14 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
 		inputEvents.add(new StreamRecord<>(end, 5));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).next("end").where(new FilterFunction<Event>() {
+		}).next("end").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -217,21 +220,21 @@ public class NFAITCase extends TestLogger {
 		events.add(new StreamRecord<Event>(new Event(6, "end", 1.0), 13));
 
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 7907391379273505897L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("start");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = -3268741540234334074L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("middle");
 			}
-		}).followedBy("end").where(new FilterFunction<Event>() {
+		}).followedBy("end").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = -8995174172182138608L;
 
 			@Override
@@ -240,11 +243,12 @@ public class NFAITCase extends TestLogger {
 			}
 		}).within(Time.milliseconds(10));
 
-
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
 		for (StreamRecord<Event> event: events) {
-			Collection<Map<String, Event>> patterns = nfa.process(event.getValue(), event.getTimestamp()).f0;
+			Collection<Map<String, Event>> patterns = nfa.process(
+					event.getValue(),
+					event.getTimestamp()).f0;
 
 			resultingPatterns.addAll(patterns);
 		}
@@ -269,7 +273,6 @@ public class NFAITCase extends TestLogger {
 		Set<Tuple2<Map<String, Event>, Long>> resultingTimeoutPatterns = new HashSet<>();
 		Set<Tuple2<Map<String, Event>, Long>> expectedTimeoutPatterns = new HashSet<>();
 
-
 		events.add(new StreamRecord<Event>(new Event(1, "start", 1.0), 1));
 		events.add(new StreamRecord<Event>(new Event(2, "start", 1.0), 2));
 		events.add(new StreamRecord<Event>(new Event(3, "middle", 1.0), 3));
@@ -296,21 +299,21 @@ public class NFAITCase extends TestLogger {
 		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern3, 11L));
 		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern4, 13L));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 7907391379273505897L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("start");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = -3268741540234334074L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("middle");
 			}
-		}).followedBy("end").where(new FilterFunction<Event>() {
+		}).followedBy("end").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = -8995174172182138608L;
 
 			@Override
@@ -319,7 +322,6 @@ public class NFAITCase extends TestLogger {
 			}
 		}).within(Time.milliseconds(10));
 
-
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), true);
 
 		for (StreamRecord<Event> event: events) {
@@ -359,38 +361,35 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<Event>(nextOne2, 7));
 		inputEvents.add(new StreamRecord<Event>(endEvent, 8));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("start");
 			}
-		})
-			.followedBy("middle-first").subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {
-				private static final long serialVersionUID = 6215754202506583964L;
+		}).followedBy("middle-first").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+			private static final long serialVersionUID = 6215754202506583964L;
 
-				@Override
-				public boolean filter(SubEvent value) throws Exception {
-					return value.getVolume() > 5.0;
-				}
-			})
-			.followedBy("middle-second").subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {
-				private static final long serialVersionUID = 6215754202506583964L;
+			@Override
+			public boolean filter(SubEvent value) throws Exception {
+				return value.getVolume() > 5.0;
+			}
+		}).followedBy("middle-second").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+			private static final long serialVersionUID = 6215754202506583964L;
 
-				@Override
-				public boolean filter(SubEvent value) throws Exception {
-					return value.getName().equals("next-one");
-				}
-			})
-			.followedBy("end").where(new FilterFunction<Event>() {
-				private static final long serialVersionUID = 7056763917392056548L;
+			@Override
+			public boolean filter(SubEvent value) throws Exception {
+				return value.getName().equals("next-one");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 7056763917392056548L;
 
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("end");
-				}
-			});
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		});
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
@@ -443,44 +442,42 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(end3, 8));
 		inputEvents.add(new StreamRecord<>(end4, 9));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).zeroOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+		}).zeroOrMore(false).followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("b");
 			}
-		})
-			.followedBy("end2").where(new FilterFunction<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
+		}).followedBy("end2").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
 
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("d");
-				}
-			})
-			.followedBy("end3").where(new FilterFunction<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		}).followedBy("end3").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
 
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("e");
-				}
-			});
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("e");
+			}
+		});
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
@@ -533,21 +530,21 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
 		inputEvents.add(new StreamRecord<>(end1, 6));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).zeroOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+		}).zeroOrMore(false).followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -597,21 +594,21 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
 		inputEvents.add(new StreamRecord<>(end1, 6));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).zeroOrMore(true).followedBy("end1").where(new FilterFunction<Event>() {
+		}).zeroOrMore(true).followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -660,14 +657,14 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
 		inputEvents.add(new StreamRecord<>(end, 6));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).zeroOrMore().followedBy("end").where(new FilterFunction<Event>() {
+		}).zeroOrMore().followedBy("end").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -720,28 +717,28 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
 		inputEvents.add(new StreamRecord<>(end, 6));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle-first").where(new FilterFunction<Event>() {
+		}).followedBy("middle-first").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).zeroOrMore(false).followedBy("middle-second").where(new FilterFunction<Event>() {
+		}).zeroOrMore(false).followedBy("middle-second").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("d");
 			}
-		}).zeroOrMore(false).followedBy("end").where(new FilterFunction<Event>() {
+		}).zeroOrMore(false).followedBy("end").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -799,35 +796,35 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(kleene2, 7));
 		inputEvents.add(new StreamRecord<>(end, 8));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("branching").where(new FilterFunction<Event>() {
+		}).followedBy("branching").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).followedBy("merging").where(new FilterFunction<Event>() {
+		}).followedBy("merging").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("f");
 			}
-		}).followedBy("kleene").where(new FilterFunction<Event>() {
+		}).followedBy("kleene").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("d");
 			}
-		}).zeroOrMore(false).followedBy("end").where(new FilterFunction<Event>() {
+		}).zeroOrMore(false).followedBy("end").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -881,14 +878,14 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(middleEvent3, 4));
 		inputEvents.add(new StreamRecord<>(end, 5));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("d");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -896,7 +893,7 @@ public class NFAITCase extends TestLogger {
 				return value.getName().equals("a");
 			}
 		}).zeroOrMore()
-			.next("end").where(new FilterFunction<Event>() {
+			.next("end").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -909,7 +906,6 @@ public class NFAITCase extends TestLogger {
 
 		Set<Set<Event>> resultingPatterns = new HashSet<>();
 
-
 		for (StreamRecord<Event> inputEvent : inputEvents) {
 			Collection<Map<String, Event>> patterns = nfa.process(
 				inputEvent.getValue(),
@@ -937,29 +933,28 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(middleEvent2, 3));
 		inputEvents.add(new StreamRecord<>(end, 5));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("d");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).zeroOrMore(false)
-			.next("end").where(new FilterFunction<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
+		}).zeroOrMore(false).next("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
 
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("b");
-				}
-			});
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
@@ -998,21 +993,21 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
 		inputEvents.add(new StreamRecord<>(end1, 6));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).oneOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+		}).oneOrMore(false).followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1059,14 +1054,14 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(startEvent3, 5));
 		inputEvents.add(new StreamRecord<>(end1, 6));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).oneOrMore(false).followedBy("end").where(new FilterFunction<Event>() {
+		}).oneOrMore(false).followedBy("end").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1120,21 +1115,21 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(middleEvent3, 5L));
 		inputEvents.add(new StreamRecord<>(endEvent, 6L));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 6215754202506583964L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("start");
 			}
-		}).next("middle").where(new FilterFunction<Event>() {
+		}).next("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 6215754202506583964L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("middle");
 			}
-		}).zeroOrMore(false).followedBy("end").where(new FilterFunction<Event>() {
+		}).zeroOrMore(false).followedBy("end").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 7056763917392056548L;
 
 			@Override
@@ -1181,21 +1176,21 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
 		inputEvents.add(new StreamRecord<>(end1, 6));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).oneOrMore(true).followedBy("end1").where(new FilterFunction<Event>() {
+		}).oneOrMore(true).followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1240,21 +1235,21 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(middleEvent, 5));
 		inputEvents.add(new StreamRecord<>(end1, 6));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).optional().followedBy("end1").where(new FilterFunction<Event>() {
+		}).optional().followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1303,21 +1298,21 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(middleEvent3, 4));
 		inputEvents.add(new StreamRecord<>(end1, 6));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).next("middle").where(new FilterFunction<Event>() {
+		}).next("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).times(2).followedBy("end1").where(new FilterFunction<Event>() {
+		}).times(2).followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1362,14 +1357,14 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(middleEvent3, 4));
 		inputEvents.add(new StreamRecord<>(end1, 6));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).times(2).followedBy("end1").where(new FilterFunction<Event>() {
+		}).times(2).followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1411,14 +1406,14 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(startEvent, 1));
 		inputEvents.add(new StreamRecord<>(end1, 6));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).optional().followedBy("end1").where(new FilterFunction<Event>() {
+		}).optional().followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1464,14 +1459,14 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
 		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1525,7 +1520,7 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(end2, 6));
 		inputEvents.add(new StreamRecord<>(end3, 6));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1571,14 +1566,14 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(startEvent, 1));
 		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1624,14 +1619,14 @@ public class NFAITCase extends TestLogger {
 		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
 		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1676,21 +1671,21 @@ public class NFAITCase extends TestLogger {
 		Event middleEvent3 = new Event(43, "a", 4.0);
 		Event end1 = new Event(44, "b", 5.0);
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).next("middle").where(new FilterFunction<Event>() {
+		}).next("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).times(2).followedBy("end1").where(new FilterFunction<Event>() {
+		}).times(2).followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1719,21 +1714,21 @@ public class NFAITCase extends TestLogger {
 		Event middleEvent = new Event(43, "a", 4.0);
 		Event end1 = new Event(44, "b", 5.0);
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).optional().followedBy("end1").where(new FilterFunction<Event>() {
+		}).optional().followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1761,21 +1756,21 @@ public class NFAITCase extends TestLogger {
 		Event middleEvent2 = new Event(42, "a", 3.0);
 		Event end1 = new Event(44, "b", 5.0);
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).oneOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+		}).oneOrMore(false).followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1805,21 +1800,21 @@ public class NFAITCase extends TestLogger {
 		Event middleEvent2 = new Event(42, "a", 3.0);
 		Event end1 = new Event(44, "b", 5.0);
 
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("middle").where(new FilterFunction<Event>() {
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).zeroOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+		}).zeroOrMore(false).followedBy("end1").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
@@ -1841,4 +1836,459 @@ public class NFAITCase extends TestLogger {
 		assertEquals(true, nfa.isEmpty());
 	}
 
+
+	//////////////////////			Iterative BooleanConditions			/////////////////////////
+
+	private final Event startEvent1 = new Event(40, "start", 1.0);
+	private final Event startEvent2 = new Event(40, "start", 2.0);
+	private final Event startEvent3 = new Event(40, "start", 3.0);
+	private final Event startEvent4 = new Event(40, "start", 4.0);
+	private final SubEvent middleEvent1 = new SubEvent(41, "foo1", 1.0, 10);
+	private final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10);
+	private final SubEvent middleEvent3 = new SubEvent(43, "foo3", 3.0, 10);
+	private final SubEvent middleEvent4 = new SubEvent(43, "foo4", 1.0, 10);
+	private final Event nextOne = new Event(44, "next-one", 1.0);
+	private final Event endEvent = new Event(46, "end", 1.0);
+
+	@Test
+	public void testIterativeWithBranchingPatternEager() {
+		List<List<Event>> actual = testIterativeWithBranchingPattern(true);
+
+		compareMaps(actual,
+				Lists.<List<Event>>newArrayList(
+						Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4),
+						Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
+						Lists.newArrayList(startEvent1, endEvent, middleEvent1),
+						Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
+						Lists.newArrayList(startEvent2, endEvent, middleEvent3)
+				)
+		);
+	}
+
+	@Test
+	public void testIterativeWithBranchingPatternCombinations() {
+		List<List<Event>> actual = testIterativeWithBranchingPattern(false);
+
+		compareMaps(actual,
+				Lists.<List<Event>>newArrayList(
+						Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4),
+						Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
+						Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent3),
+						Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent2),
+						Lists.newArrayList(startEvent1, endEvent, middleEvent3, middleEvent1),
+						Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
+						Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent1),
+						Lists.newArrayList(startEvent1, endEvent, middleEvent4),
+						Lists.newArrayList(startEvent1, endEvent, middleEvent1),
+						Lists.newArrayList(startEvent1, endEvent, middleEvent2),
+						Lists.newArrayList(startEvent1, endEvent, middleEvent3),
+						Lists.newArrayList(startEvent2, endEvent, middleEvent4),
+						Lists.newArrayList(startEvent2, endEvent, middleEvent3)
+				)
+		);
+	}
+
+	private List<List<Event>> testIterativeWithBranchingPattern(boolean eager) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(startEvent1, 1));
+		inputEvents.add(new StreamRecord<Event>(middleEvent1, 2));
+		inputEvents.add(new StreamRecord<Event>(middleEvent2, 3));
+		inputEvents.add(new StreamRecord<>(startEvent2, 4));
+		inputEvents.add(new StreamRecord<Event>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<Event>(middleEvent4, 5));
+		inputEvents.add(new StreamRecord<>(nextOne, 6));
+		inputEvents.add(new StreamRecord<>(endEvent, 8));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).followedBy("middle").subtype(SubEvent.class).where(new IterativeCondition<SubEvent>() {
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
+				if (!value.getName().startsWith("foo")) {
+					return false;
+				}
+
+				double sum = 0.0;
+				for (Event event : ctx.getEventsForPattern("middle")) {
+					sum += event.getPrice();
+				}
+				sum += value.getPrice();
+				return Double.compare(sum, 5.0) < 0;
+			}
+		}).oneOrMore(eager).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 7056763917392056548L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+					inputEvent.getValue(),
+					inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> p: patterns) {
+				resultingPatterns.add(new ArrayList<>(p.values()));
+			}
+		}
+
+		return resultingPatterns;
+	}
+
+	@Test
+	public void testIterativeWithLoopingStartingEager() {
+		List<List<Event>> actual = testIterativeWithLoopingStarting(true);
+
+		compareMaps(actual,
+				Lists.<List<Event>>newArrayList(
+						Lists.newArrayList(startEvent1, startEvent2, endEvent),
+						Lists.newArrayList(startEvent1, endEvent),
+						Lists.newArrayList(startEvent2, endEvent),
+						Lists.newArrayList(startEvent3, endEvent),
+						Lists.newArrayList(endEvent)
+				)
+		);
+	}
+
+	@Test
+	public void testIterativeWithLoopingStartingCombination() {
+		List<List<Event>> actual = testIterativeWithLoopingStarting(false);
+
+		compareMaps(actual,
+				Lists.<List<Event>>newArrayList(
+						Lists.newArrayList(startEvent1, startEvent2, endEvent),
+						Lists.newArrayList(startEvent1, startEvent3, endEvent),
+						Lists.newArrayList(startEvent1, endEvent),
+						Lists.newArrayList(startEvent2, endEvent),
+						Lists.newArrayList(startEvent3, endEvent),
+						Lists.newArrayList(endEvent)
+				)
+		);
+	}
+
+	private List<List<Event>> testIterativeWithLoopingStarting(boolean eager) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(startEvent1, 1L));
+		inputEvents.add(new StreamRecord<>(startEvent2, 2L));
+		inputEvents.add(new StreamRecord<>(startEvent3, 3L));
+		inputEvents.add(new StreamRecord<>(endEvent, 4L));
+
+		// for now, a pattern inherits its continuity property from the followedBy() or next(), and the default
+		// behavior (which is the one applied in the case that the pattern graph starts with such a pattern)
+		// of a looping pattern is with relaxed continuity (as in followedBy).
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new IterativeCondition<Event>() {
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+				if (!value.getName().equals("start")) {
+					return false;
+				}
+
+				double sum = 0.0;
+				for (Event event : ctx.getEventsForPattern("start")) {
+					sum += event.getPrice();
+				}
+				sum += value.getPrice();
+				return Double.compare(sum, 5.0) < 0;
+			}
+		}).zeroOrMore(eager).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 7056763917392056548L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+					inputEvent.getValue(),
+					inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> p: patterns) {
+				resultingPatterns.add(new ArrayList<>(p.values()));
+			}
+		}
+
+		return resultingPatterns;
+	}
+
+	@Test
+	public void testIterativeWithPrevPatternDependency() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(startEvent1, 1L));
+		inputEvents.add(new StreamRecord<>(startEvent2, 2L));
+		inputEvents.add(new StreamRecord<>(endEvent, 4L));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).oneOrMore().followedBy("end").where(new IterativeCondition<Event>() {
+			private static final long serialVersionUID = 7056763917392056548L;
+
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+				if (!value.getName().equals("end")) {
+					return false;
+				}
+
+				double sum = 0.0;
+				for (Event event : ctx.getEventsForPattern("start")) {
+					sum += event.getPrice();
+				}
+				return Double.compare(sum, 2.0) >= 0;
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+					inputEvent.getValue(),
+					inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> p: patterns) {
+				resultingPatterns.add(new ArrayList<>(p.values()));
+			}
+		}
+
+		compareMaps(resultingPatterns,
+				Lists.<List<Event>>newArrayList(
+						Lists.newArrayList(startEvent1, startEvent2, endEvent),
+						Lists.newArrayList(startEvent2, endEvent)
+				)
+		);
+	}
+
+	@Test
+	public void testIterativeWithABACPattern() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(startEvent1, 1L)); //1
+		inputEvents.add(new StreamRecord<Event>(middleEvent1, 2L)); //1
+
+		inputEvents.add(new StreamRecord<>(startEvent2, 2L)); //2
+		inputEvents.add(new StreamRecord<>(startEvent3, 2L)); //3
+		inputEvents.add(new StreamRecord<Event>(middleEvent2, 2L)); //2
+
+		inputEvents.add(new StreamRecord<>(startEvent4, 2L)); //4
+		inputEvents.add(new StreamRecord<Event>(middleEvent3, 2L)); //3
+		inputEvents.add(new StreamRecord<Event>(middleEvent4, 2L)); //1
+		inputEvents.add(new StreamRecord<>(endEvent, 4L));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).followedBy("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+			private static final long serialVersionUID = 2178338526904474690L;
+
+			@Override
+			public boolean filter(SubEvent value) throws Exception {
+				return value.getName().startsWith("foo");
+			}
+		}).followedBy("middle2").where(new IterativeCondition<Event>() {
+			private static final long serialVersionUID = -1223388426808292695L;
+
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+				if (!value.getName().equals("start")) {
+					return false;
+				}
+
+				double sum = 0.0;
+				for (Event e: ctx.getEventsForPattern("middle2")) {
+					sum += e.getPrice();
+				}
+				sum += value.getPrice();
+				return Double.compare(sum, 5.0) <= 0;
+			}
+		}).oneOrMore().followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 562590474115118323L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+					inputEvent.getValue(),
+					inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> p: patterns) {
+				resultingPatterns.add(new ArrayList<>(p.values()));
+			}
+		}
+
+		compareMaps(resultingPatterns,
+				Lists.<List<Event>>newArrayList(
+						Lists.newArrayList(startEvent1, startEvent2, startEvent3, middleEvent1, endEvent),
+						Lists.newArrayList(startEvent1, middleEvent1, startEvent2, endEvent),
+						Lists.newArrayList(startEvent1, middleEvent2, startEvent4, endEvent),
+						Lists.newArrayList(startEvent2, middleEvent2, startEvent4, endEvent),
+						Lists.newArrayList(startEvent3, middleEvent2, startEvent4, endEvent)
+				)
+		);
+	}
+
+	@Test
+	public void testIterativeWithPrevPatternDependencyAfterBranching() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(startEvent1, 1L));
+		inputEvents.add(new StreamRecord<>(startEvent2, 2L));
+		inputEvents.add(new StreamRecord<Event>(middleEvent1, 4L));
+		inputEvents.add(new StreamRecord<>(startEvent3, 5L));
+		inputEvents.add(new StreamRecord<Event>(middleEvent2, 6L));
+		inputEvents.add(new StreamRecord<>(endEvent, 7L));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).oneOrMore().followedBy("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+			private static final long serialVersionUID = 2178338526904474690L;
+
+			@Override
+			public boolean filter(SubEvent value) throws Exception {
+				return value.getName().startsWith("foo");
+			}
+		}).followedBy("end").where(new IterativeCondition<Event>() {
+			private static final long serialVersionUID = 7056763917392056548L;
+
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+				if (!value.getName().equals("end")) {
+					return false;
+				}
+
+				double sum = 0.0;
+				for (Event event : ctx.getEventsForPattern("start")) {
+					sum += event.getPrice();
+				}
+				return Double.compare(sum, 2.0) >= 0;
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+					inputEvent.getValue(),
+					inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> p: patterns) {
+				resultingPatterns.add(new ArrayList<>(p.values()));
+			}
+		}
+
+		compareMaps(resultingPatterns,
+				Lists.<List<Event>>newArrayList(
+						Lists.newArrayList(startEvent1, startEvent2, middleEvent1, endEvent),
+						Lists.newArrayList(startEvent2, middleEvent1, endEvent),
+						Lists.newArrayList(startEvent1, startEvent2, middleEvent2, endEvent),
+						Lists.newArrayList(startEvent1, startEvent2, startEvent3, middleEvent2, endEvent),
+						Lists.newArrayList(startEvent2, startEvent3, middleEvent2, endEvent),
+						Lists.newArrayList(startEvent2, middleEvent2, endEvent),
+						Lists.newArrayList(startEvent3, middleEvent2, endEvent)
+				)
+		);
+	}
+
+	private void compareMaps(List<List<Event>> actual, List<List<Event>> expected) {
+		Assert.assertEquals(expected.size(), actual.size());
+
+		for (List<Event> p: actual) {
+			Collections.sort(p, new EventComparator());
+		}
+
+		for (List<Event> p: expected) {
+			Collections.sort(p, new EventComparator());
+		}
+
+		Collections.sort(actual, new ListEventComparator());
+		Collections.sort(expected, new ListEventComparator());
+		Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+	}
+
+	private class ListEventComparator implements Comparator<List<Event>> {
+
+		@Override
+		public int compare(List<Event> o1, List<Event> o2) {
+			int sizeComp = Integer.compare(o1.size(), o2.size());
+			if (sizeComp == 0) {
+				EventComparator comp = new EventComparator();
+				for (int i = 0; i < o1.size(); i++) {
+					int eventComp = comp.compare(o1.get(i), o2.get(i));
+					if (eventComp != 0) {
+						return eventComp;
+					}
+				}
+				return 0;
+			} else {
+				return sizeComp;
+			}
+		}
+	}
+
+	private class EventComparator implements Comparator<Event> {
+
+		@Override
+		public int compare(Event o1, Event o2) {
+			int nameComp = o1.getName().compareTo(o2.getName());
+			int priceComp = Doubles.compare(o1.getPrice(), o2.getPrice());
+			int idComp = Integer.compare(o1.getId(), o2.getId());
+			if (nameComp == 0) {
+				if (priceComp == 0) {
+					return idComp;
+				} else {
+					return priceComp;
+				}
+			} else {
+				return nameComp;
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 40a0e7e..d2e392b 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -19,9 +19,9 @@
 package org.apache.flink.cep.nfa;
 
 import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.cep.Event;
-import org.apache.flink.cep.pattern.FilterFunctions;
+import org.apache.flink.cep.pattern.conditions.BooleanConditions;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -58,7 +58,7 @@ public class NFATest extends TestLogger {
 
 		startState.addTake(
 			endState,
-			new FilterFunction<Event>() {
+			new SimpleCondition<Event>() {
 				private static final long serialVersionUID = -4869589195918650396L;
 
 				@Override
@@ -68,7 +68,7 @@ public class NFATest extends TestLogger {
 			});
 		endState.addTake(
 			endingState,
-			new FilterFunction<Event>() {
+			new SimpleCondition<Event>() {
 				private static final long serialVersionUID = 2979804163709590673L;
 
 				@Override
@@ -76,7 +76,7 @@ public class NFATest extends TestLogger {
 					return value.getName().equals("end");
 				}
 			});
-		endState.addIgnore(FilterFunctions.<Event>trueFunction());
+		endState.addIgnore(BooleanConditions.<Event>trueFunction());
 
 		nfa.addState(startState);
 		nfa.addState(endState);
@@ -241,7 +241,7 @@ public class NFATest extends TestLogger {
 
 		startState.addTake(
 			endState,
-			new FilterFunction<Event>() {
+			new SimpleCondition<Event>() {
 				private static final long serialVersionUID = -4869589195918650396L;
 
 				@Override
@@ -251,7 +251,7 @@ public class NFATest extends TestLogger {
 			});
 		endState.addTake(
 			endingState,
-			new FilterFunction<Event>() {
+			new SimpleCondition<Event>() {
 				private static final long serialVersionUID = 2979804163709590673L;
 
 				@Override
@@ -259,7 +259,7 @@ public class NFATest extends TestLogger {
 					return value.getName().equals("end");
 				}
 			});
-		endState.addIgnore(FilterFunctions.<Event>trueFunction());
+		endState.addIgnore(BooleanConditions.<Event>trueFunction());
 
 		nfa.addState(startState);
 		nfa.addState(endState);
@@ -268,7 +268,7 @@ public class NFATest extends TestLogger {
 		return nfa;
 	}
 
-	private static class NameFilter implements FilterFunction<Event> {
+	private static class NameFilter extends SimpleCondition<Event> {
 
 		private static final long serialVersionUID = 7472112494752423802L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index 25618d5..f0a25d2 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -84,11 +84,16 @@ public class SharedBufferTest extends TestLogger {
 		sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, DeweyNumber.fromString("1.1.0"));
 
 		Collection<LinkedHashMultimap<String, Event>> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0"));
-		sharedBuffer.remove("b", events[7], timestamp);
+		sharedBuffer.release("b", events[7], timestamp);
 		Collection<LinkedHashMultimap<String, Event>> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0"));
 		Collection<LinkedHashMultimap<String, Event>> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("2.0.0"));
 		Collection<LinkedHashMultimap<String, Event>> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("1.0.0"));
-		sharedBuffer.remove("b", events[5], timestamp);
+		sharedBuffer.release("b", events[5], timestamp);
+
+		assertEquals(1L, patterns3.size());
+		assertEquals(0L, patterns4.size());
+		assertEquals(1L, patterns1.size());
+		assertEquals(1L, patterns2.size());
 
 		assertTrue(sharedBuffer.isEmpty());
 		assertTrue(patterns4.isEmpty());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index 93d78cc..80b1bcb 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.cep.nfa.compiler;
 
 import com.google.common.collect.Sets;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -32,6 +31,7 @@ import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.StateTransitionAction;
 import org.apache.flink.cep.pattern.MalformedPatternException;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
 import org.junit.Test;
@@ -48,7 +48,7 @@ import static org.junit.Assert.assertTrue;
 
 public class NFACompilerTest extends TestLogger {
 
-	private static final FilterFunction<Event> startFilter = new FilterFunction<Event>() {
+	private static final SimpleCondition<Event> startFilter = new SimpleCondition<Event>() {
 		private static final long serialVersionUID = 3314714776170474221L;
 
 		@Override
@@ -57,7 +57,7 @@ public class NFACompilerTest extends TestLogger {
 		}
 	};
 
-	private static final FilterFunction<Event> endFilter = new FilterFunction<Event>() {
+	private static final SimpleCondition<Event> endFilter = new SimpleCondition<Event>() {
 		private static final long serialVersionUID = 3990995859716364087L;
 
 		@Override
@@ -91,7 +91,7 @@ public class NFACompilerTest extends TestLogger {
 	 * A filter implementation to test invalid pattern specification with
 	 * duplicate pattern names. Check {@link #testNFACompilerUniquePatternName()}.
 	 */
-	private static class TestFilter implements FilterFunction<Event> {
+	private static class TestFilter extends SimpleCondition<Event> {
 
 		private static final long serialVersionUID = -3863103355752267133L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 5a3e623..b83eb3c 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.cep.operator;
 
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.ByteSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -29,6 +28,7 @@ import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -239,7 +239,7 @@ public class CEPMigration11to13Test {
 		}
 	}
 
-	private static class StartFilter implements FilterFunction<Event> {
+	private static class StartFilter extends SimpleCondition<Event> {
 		private static final long serialVersionUID = 5726188262756267490L;
 
 		@Override
@@ -248,7 +248,7 @@ public class CEPMigration11to13Test {
 		}
 	}
 
-	private static class MiddleFilter implements FilterFunction<SubEvent> {
+	private static class MiddleFilter extends SimpleCondition<SubEvent> {
 		private static final long serialVersionUID = 6215754202506583964L;
 
 		@Override
@@ -257,7 +257,7 @@ public class CEPMigration11to13Test {
 		}
 	}
 
-	private static class EndFilter implements FilterFunction<Event> {
+	private static class EndFilter extends SimpleCondition<Event> {
 		private static final long serialVersionUID = 7056763917392056548L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
index 65fa733..f230bbc 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.cep.operator;
 
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -26,6 +25,7 @@ import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -448,7 +448,7 @@ public class CEPMigration12to13Test {
 		}
 	}
 
-	private static class StartFilter implements FilterFunction<Event> {
+	private static class StartFilter extends SimpleCondition<Event> {
 		private static final long serialVersionUID = 5726188262756267490L;
 
 		@Override
@@ -457,7 +457,7 @@ public class CEPMigration12to13Test {
 		}
 	}
 
-	private static class MiddleFilter implements FilterFunction<SubEvent> {
+	private static class MiddleFilter extends SimpleCondition<SubEvent> {
 		private static final long serialVersionUID = 6215754202506583964L;
 
 		@Override
@@ -466,7 +466,7 @@ public class CEPMigration12to13Test {
 		}
 	}
 
-	private static class EndFilter implements FilterFunction<Event> {
+	private static class EndFilter extends SimpleCondition<Event> {
 		private static final long serialVersionUID = 7056763917392056548L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index a99db05..726c8b8 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -30,6 +29,7 @@ import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -434,7 +434,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		harness.close();
 	}
-
+	
 	private void verifyWatermark(Object outputObject, long timestamp) {
 		assertTrue(outputObject instanceof Watermark);
 		assertEquals(timestamp, ((Watermark) outputObject).getTimestamp());
@@ -512,7 +512,7 @@ public class CEPOperatorTest extends TestLogger {
 		@Override
 		public NFA<Event> createNFA() {
 
-			Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 						private static final long serialVersionUID = 5726188262756267490L;
 
 						@Override
@@ -520,7 +520,7 @@ public class CEPOperatorTest extends TestLogger {
 							return value.getName().equals("start");
 						}
 					})
-					.followedBy("middle").subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {
+					.followedBy("middle").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
 						private static final long serialVersionUID = 6215754202506583964L;
 
 						@Override
@@ -528,7 +528,7 @@ public class CEPOperatorTest extends TestLogger {
 							return value.getVolume() > 5.0;
 						}
 					})
-					.followedBy("end").where(new FilterFunction<Event>() {
+					.followedBy("end").where(new SimpleCondition<Event>() {
 						private static final long serialVersionUID = 7056763917392056548L;
 
 						@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 399662a..2c86648 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.cep.Event;
@@ -27,6 +26,7 @@ import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -371,7 +371,7 @@ public class CEPRescalingTest {
 		@Override
 		public NFA<Event> createNFA() {
 
-			Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 				private static final long serialVersionUID = 5726188262756267490L;
 
 				@Override
@@ -379,7 +379,7 @@ public class CEPRescalingTest {
 					return value.getName().equals("start");
 				}
 			})
-				.followedBy("middle").subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {
+				.followedBy("middle").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
 					private static final long serialVersionUID = 6215754202506583964L;
 
 					@Override
@@ -387,7 +387,7 @@ public class CEPRescalingTest {
 						return value.getVolume() > 5.0;
 					}
 				})
-				.followedBy("end").where(new FilterFunction<Event>() {
+				.followedBy("end").where(new SimpleCondition<Event>() {
 					private static final long serialVersionUID = 7056763917392056548L;
 
 					@Override


Mime
View raw message