flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [3/4] flink git commit: [FLINK-6197] [cep] Add support for iterative conditions.
Date Tue, 28 Mar 2017 16:01:18 GMT
[FLINK-6197] [cep] Add support for iterative conditions.

So far, the where clause only supported simple FilterFunction
conditions. With this, we add support for conditions where an
event is accepted not only based on its own properties, e.g.
name, as it was before, but also based on some statistic
computed over previously accepted events in the pattern, e.g.
if the price is higher than the average of the prices of the
previously accepted events.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fbdc100
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fbdc100
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fbdc100

Branch: refs/heads/master
Commit: 7fbdc100e19d69d7e31544c20fa94cb2b314ec12
Parents: f2a8bc9
Author: kl0u <kkloudas@gmail.com>
Authored: Wed Mar 22 15:52:07 2017 +0100
Committer: kl0u <kkloudas@gmail.com>
Committed: Mon Mar 27 20:11:14 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            |  94 ++-
 .../flink/cep/scala/pattern/Pattern.scala       |  30 +-
 .../flink/cep/scala/pattern/PatternTest.scala   |  44 +-
 .../apache/flink/cep/nfa/ComputationState.java  |  81 ++-
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 251 ++++---
 .../org/apache/flink/cep/nfa/SharedBuffer.java  | 118 +--
 .../java/org/apache/flink/cep/nfa/State.java    |  18 +-
 .../apache/flink/cep/nfa/StateTransition.java   |  51 +-
 .../flink/cep/nfa/compiler/NFACompiler.java     |  47 +-
 .../flink/cep/pattern/AndFilterFunction.java    |   6 +-
 .../flink/cep/pattern/FilterFunctions.java      |  44 --
 .../flink/cep/pattern/NotFilterFunction.java    |  42 --
 .../flink/cep/pattern/OrFilterFunction.java     |   6 +-
 .../org/apache/flink/cep/pattern/Pattern.java   |  44 +-
 .../cep/pattern/SubtypeFilterFunction.java      |   6 +-
 .../cep/pattern/conditions/AndCondition.java    |  57 ++
 .../pattern/conditions/BooleanConditions.java   |  53 ++
 .../pattern/conditions/IterativeCondition.java  |  98 +++
 .../cep/pattern/conditions/NotCondition.java    |  40 +
 .../cep/pattern/conditions/OrCondition.java     |  57 ++
 .../cep/pattern/conditions/SimpleCondition.java |  39 +
 .../pattern/conditions/SubtypeCondition.java    |  41 ++
 .../java/org/apache/flink/cep/CEPITCase.java    |  42 +-
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 726 +++++++++++++++----
 .../java/org/apache/flink/cep/nfa/NFATest.java  |  18 +-
 .../apache/flink/cep/nfa/SharedBufferTest.java  |   9 +-
 .../flink/cep/nfa/compiler/NFACompilerTest.java |   8 +-
 .../cep/operator/CEPMigration11to13Test.java    |   8 +-
 .../cep/operator/CEPMigration12to13Test.java    |   8 +-
 .../flink/cep/operator/CEPOperatorTest.java     |  10 +-
 .../flink/cep/operator/CEPRescalingTest.java    |   8 +-
 .../apache/flink/cep/pattern/PatternTest.java   |  52 +-
 32 files changed, 1561 insertions(+), 595 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 22cffbc..9d4ca91 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -124,13 +124,70 @@ val start : Pattern[Event, _] = Pattern.begin("start")
 </div>
 </div>
 
-Each state must have an unique name to identify the matched events later on.
+Each state must have a unique name to identify the matched events later on.
 Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method.
+These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`. 
+
+**Iterative Conditions:** This type of conditions can iterate over the previously accepted elements in the pattern and 
+decide to accept a new element or not, based on some statistic over those elements. 
+
+Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum 
+of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do 
+not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g.
+`oneToMany` or `zeroToMany`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+start.where(new IterativeCondition<SubEvent>() {
+    @Override
+    public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
+        if (!value.getName().startsWith("foo")) {
+            return false;
+        }
+        
+        double sum = 0.0;
+        for (Event event : ctx.getEventsForPattern("middle")) {
+            sum += event.getPrice();
+        }
+        sum += value.getPrice();
+        return Double.compare(sum, 5.0) < 0;
+    }
+});
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+start.where(
+    (value, ctx) => {
+        var res = value.getName.startsWith("foo")
+        if (res) {
+            var sum = 0.0
+            for (e: Event <- ctx.getEventsForPattern("middle")) {
+                sum += e.getPrice
+            }
+            sum += value.getPrice
+            res = res && sum < 5.0
+        }
+        res
+    }
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> The call to `Context.getEventsForPattern(...)` has to find the 
+elements that belong to the pattern. The cost of this operation can vary, so when implementing your condition, try 
+to minimize the times the method is called.
+
+**Simple Conditions:** This type of conditions extend the aforementioned `IterativeCondition` class. They are simple 
+filtering conditions that decide to accept an element or not, based only on properties of the element itself.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-start.where(new FilterFunction<Event>() {
+start.where(new SimpleCondition<Event>() {
     @Override
     public boolean filter(Event value) {
         return ... // some condition
@@ -151,7 +208,7 @@ We can also restrict the type of the accepted event to some subtype of the initi
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-start.subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {
+start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
     @Override
     public boolean filter(SubEvent value) {
         return ... // some condition
@@ -168,7 +225,7 @@ start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)
 </div>
 
 As it can be seen here, the subtype condition can also be combined with an additional filter condition on the subtype.
-In fact you can always provide multiple conditions by calling `where` and `subtype` multiple times.
+In fact, you can always provide multiple conditions by calling `where` and `subtype` multiple times.
 These conditions will then be combined using the logical AND operator.
 
 In order to construct or conditions, one has to call the `or` method with a respective filter function.
@@ -177,12 +234,12 @@ Any existing filter function is then ORed with the given one.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-pattern.where(new FilterFunction<Event>() {
+pattern.where(new SimpleCondition<Event>() {
     @Override
     public boolean filter(Event value) {
         return ... // some condition
     }
-}).or(new FilterFunction<Event>() {
+}).or(new SimpleCondition<Event>() {
     @Override
     public boolean filter(Event value) {
         return ... // or condition
@@ -201,8 +258,8 @@ pattern.where(event => ... /* some condition */).or(event => ... /* or condition
 Next, we can append further states to detect complex patterns.
 We can control the contiguity of two succeeding events to be accepted by the pattern.
 
-Strict contiguity means that two matching events have to succeed directly.
-This means that no other events can occur in between.
+Strict contiguity means that two matching events have to be directly the one after the other.
+This means that no other events can occur in between. 
 A strict contiguity pattern state can be created via the `next` method.
 
 <div class="codetabs" markdown="1">
@@ -236,7 +293,8 @@ val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")
 </div>
 </div>
 It is also possible to define a temporal constraint for the pattern to be valid.
-For example, one can define that a pattern should occur within 10 seconds via the `within` method.
+For example, one can define that a pattern should occur within 10 seconds via the `within` method. 
+Temporal patterns are supported for both [processing and event time]({{site.baseurl}}/dev/event_time.html).
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -294,11 +352,11 @@ Pattern<Event, ?> followedBy = start.followedBy("next");
         <tr>
             <td><strong>Where</strong></td>
             <td>
-                <p>Defines a filter condition for the current pattern state. Only if an event passes the filter, it can match the state:</p>
+                <p>Defines a condition for the current pattern state. Only if an event satisifes the condition, it can match the state:</p>
 {% highlight java %}
-patternState.where(new FilterFunction<Event>() {
+patternState.where(new IterativeCondition<Event>() {
     @Override
-    public boolean filter(Event value) throws Exception {
+    public boolean filter(Event value, Context ctx) throws Exception {
         return ... // some condition
     }
 });
@@ -310,14 +368,14 @@ patternState.where(new FilterFunction<Event>() {
             <td>
                 <p>Adds a new filter condition which is ORed with an existing filter condition. Only if an event passes the filter condition, it can match the state:</p>
 {% highlight java %}
-patternState.where(new FilterFunction<Event>() {
+patternState.where(new IterativeCondition<Event>() {
     @Override
-    public boolean filter(Event value) throws Exception {
+    public boolean filter(Event value, Context ctx) throws Exception {
         return ... // some condition
     }
-}).or(new FilterFunction<Event>() {
+}).or(new IterativeCondition<Event>() {
     @Override
-    public boolean filter(Event value) throws Exception {
+    public boolean filter(Event value, Context ctx) throws Exception {
         return ... // alternative condition
     }
 });
@@ -684,12 +742,12 @@ DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>
 });
 
 Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
-	.next("middle").where(new FilterFunction<Event>() {
+	.next("middle").where(new SimpleCondition<Event>() {
 		@Override
 		public boolean filter(Event value) throws Exception {
 			return value.getName().equals("error");
 		}
-	}).followedBy("end").where(new FilterFunction<Event>() {
+	}).followedBy("end").where(new SimpleCondition<Event>() {
 		@Override
 		public boolean filter(Event value) throws Exception {
 			return value.getName().equals("critical");

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 5baf780..a1db460 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -17,8 +17,9 @@
  */
 package org.apache.flink.cep.scala.pattern
 
-import org.apache.flink.api.common.functions.FilterFunction
 import org.apache.flink.cep
+import org.apache.flink.cep.pattern.conditions.IterativeCondition
+import org.apache.flink.cep.pattern.conditions.IterativeCondition.Context
 import org.apache.flink.cep.pattern.{Quantifier, Pattern => JPattern}
 import org.apache.flink.streaming.api.windowing.time.Time
 
@@ -67,8 +68,8 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     *
     * @return Filter condition for an event to be matched
     */
-  def getFilterFunction(): Option[FilterFunction[F]] = {
-    Option(jPattern.getFilterFunction())
+  def getCondition(): Option[IterativeCondition[F]] = {
+    Option(jPattern.getCondition())
   }
 
   /**
@@ -127,7 +128,7 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     * @param filter Filter condition
     * @return The same pattern operator where the new filter condition is set
     */
-  def where(filter: FilterFunction[F]): Pattern[T, F] = {
+  def where(filter: IterativeCondition[F]): Pattern[T, F] = {
     jPattern.where(filter)
     this
   }
@@ -138,7 +139,7 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     * @param filter Or filter function
     * @return The same pattern operator where the new filter condition is set
     */
-  def or(filter: FilterFunction[F]): Pattern[T, F] = {
+  def or(filter: IterativeCondition[F]): Pattern[T, F] = {
     jPattern.or(filter)
     this
   }
@@ -149,11 +150,26 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     * @param filterFun Filter condition
     * @return The same pattern operator where the new filter condition is set
     */
+  def where(filterFun: (F, Context[F]) => Boolean): Pattern[T, F] = {
+    val filter = new IterativeCondition[F] {
+      val cleanFilter = cep.scala.cleanClosure(filterFun)
+
+      override def filter(value: F, ctx: Context[F]): Boolean = cleanFilter(value, ctx)
+    }
+    where(filter)
+  }
+
+  /**
+    * Specifies a filter condition which has to be fulfilled by an event in order to be matched.
+    *
+    * @param filterFun Filter condition
+    * @return The same pattern operator where the new filter condition is set
+    */
   def where(filterFun: F => Boolean): Pattern[T, F] = {
-    val filter = new FilterFunction[F] {
+    val filter = new IterativeCondition[F] {
       val cleanFilter = cep.scala.cleanClosure(filterFun)
 
-      override def filter(value: F): Boolean = cleanFilter(value)
+      override def filter(value: F, ctx: Context[F]): Boolean = cleanFilter(value)
     }
     where(filter)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
index 5f49031..a95dddd 100644
--- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
+++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
@@ -17,12 +17,13 @@
  */
 package org.apache.flink.cep.scala.pattern
 
-import org.apache.flink.api.common.functions.FilterFunction
-import org.apache.flink.cep.pattern.{AndFilterFunction, SubtypeFilterFunction, Pattern => JPattern}
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.flink.cep.Event
 import org.apache.flink.cep.SubEvent
+import org.apache.flink.cep.pattern.conditions.IterativeCondition.Context
+import org.apache.flink.cep.pattern.conditions._
 
 class PatternTest {
 
@@ -80,19 +81,19 @@ class PatternTest {
   def testStrictContiguityWithCondition: Unit = {
     val pattern = Pattern.begin[Event]("start")
       .next("next")
-      .where((value: Event) => value.getName() == "foobar")
+      .where((value: Event, ctx: Context[Event]) => value.getName() == "foobar")
       .next("end")
-      .where((value: Event) => value.getId() == 42)
+      .where((value: Event, ctx: Context[Event]) => value.getId() == 42)
 
     val jPattern = JPattern.begin[Event]("start")
       .next("next")
-      .where(new FilterFunction[Event]() {
+      .where(new SimpleCondition[Event]() {
         @throws[Exception]
         def filter(value: Event): Boolean = {
           return value.getName() == "foobar"
         }
       }).next("end")
-      .where(new FilterFunction[Event]() {
+      .where(new SimpleCondition[Event]() {
         @throws[Exception]
         def filter(value: Event): Boolean = {
           return value.getId() == 42
@@ -109,9 +110,9 @@ class PatternTest {
     assertTrue(previous.getPrevious.isDefined)
     assertFalse(preprevious.getPrevious.isDefined)
 
-    assertTrue(pattern.getFilterFunction.isDefined)
-    assertTrue(previous.getFilterFunction.isDefined)
-    assertFalse(preprevious.getFilterFunction.isDefined)
+    assertTrue(pattern.getCondition.isDefined)
+    assertTrue(previous.getCondition.isDefined)
+    assertFalse(preprevious.getCondition.isDefined)
 
     assertEquals(pattern.getName, "end")
     assertEquals(previous.getName, "next")
@@ -140,8 +141,8 @@ class PatternTest {
     assertTrue(previous.getPrevious.isDefined)
     assertFalse(preprevious.getPrevious.isDefined)
 
-    assertTrue(previous.getFilterFunction.isDefined)
-    assertTrue(previous.getFilterFunction.get.isInstanceOf[SubtypeFilterFunction[_]])
+    assertTrue(previous.getCondition.isDefined)
+    assertTrue(previous.getCondition.get.isInstanceOf[SubtypeCondition[_]])
 
     assertEquals(pattern.getName, "end")
     assertEquals(previous.getName, "subevent")
@@ -159,7 +160,7 @@ class PatternTest {
     val jpattern = JPattern.begin[Event]("start")
       .next("subevent")
       .subtype(classOf[SubEvent])
-      .where(new FilterFunction[SubEvent]() {
+      .where(new SimpleCondition[SubEvent]() {
         @throws[Exception]
         def filter(value: SubEvent): Boolean = {
           return false
@@ -178,7 +179,7 @@ class PatternTest {
     assertFalse(preprevious.getPrevious.isDefined)
 
     assertTrue(pattern.isInstanceOf[FollowedByPattern[_, _]])
-    assertTrue(previous.getFilterFunction.isDefined)
+    assertTrue(previous.getCondition().isDefined)
 
     assertEquals(pattern.getName, "end")
     assertEquals(previous.getName, "subevent")
@@ -206,8 +207,8 @@ class PatternTest {
       jPattern.getClass().getSimpleName())
       //best effort to confirm congruent filter functions
       && compareFilterFunctions(
-      pattern.getFilterFunction.orNull,
-      jPattern.getFilterFunction())
+      pattern.getCondition().orNull,
+      jPattern.getCondition())
       //recursively check previous patterns
       && checkCongruentRepresentations(
       pattern.getPrevious.orNull,
@@ -218,7 +219,8 @@ class PatternTest {
     a == b && b == c
   }
 
-  def compareFilterFunctions(sFilter: FilterFunction[_], jFilter: FilterFunction[_]): Boolean = {
+  def compareFilterFunctions(sFilter: IterativeCondition[_],
+                             jFilter: IterativeCondition[_]): Boolean = {
     /**
       * We would like to simply compare the filter functions like this:
       *
@@ -230,16 +232,16 @@ class PatternTest {
       */
     (sFilter, jFilter) match {
       //matching types: and-filter; branch and recurse for inner filters
-      case (saf: AndFilterFunction[_], jaf: AndFilterFunction[_])
+      case (saf: AndCondition[_], jaf: AndCondition[_])
       => (compareFilterFunctions(saf.getLeft(), jaf.getLeft())
         && compareFilterFunctions(saf.getRight(), jaf.getRight()))
       //matching types: subtype-filter
-      case (saf: SubtypeFilterFunction[_], jaf: SubtypeFilterFunction[_]) => true
+      case (saf: SubtypeCondition[_], jaf: SubtypeCondition[_]) => true
       //mismatch: one-sided and/subtype-filter
-      case (_: AndFilterFunction[_] | _: SubtypeFilterFunction[_], _) => false
-      case (_, _: AndFilterFunction[_] | _: SubtypeFilterFunction[_]) => false
+      case (_: AndCondition[_] | _: SubtypeCondition[_], _) => false
+      case (_, _: AndCondition[_] | _: SubtypeCondition[_]) => false
       //from here we can only check mutual presence or absence of a function
-      case (s: FilterFunction[_], j: FilterFunction[_]) => true
+      case (s: IterativeCondition[_], j: IterativeCondition[_]) => true
       case (null, null) => true
       case _ => false
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
index 445d038..80227fc 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
@@ -18,8 +18,14 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Helper class which encapsulates the state of the NFA computation. It points to the current state,
  * the last taken event, its occurrence timestamp, the current version and the starting timestamp
@@ -45,7 +51,10 @@ public class ComputationState<T> {
 
 	private final State<T> previousState;
 
+	private final ConditionContext conditionContext;
+
 	private ComputationState(
+			final NFA<T> nfa,
 			final State<T> currentState,
 			final State<T> previousState,
 			final T event,
@@ -58,6 +67,11 @@ public class ComputationState<T> {
 		this.version = version;
 		this.startTimestamp = startTimestamp;
 		this.previousState = previousState;
+		this.conditionContext = new ConditionContext(nfa, this);
+	}
+
+	public ConditionContext getConditionContext() {
+		return conditionContext;
 	}
 
 	public boolean isFinalState() {
@@ -92,23 +106,80 @@ public class ComputationState<T> {
 		return version;
 	}
 
-	public static <T> ComputationState<T> createStartState(final State<T> state) {
+	public static <T> ComputationState<T> createStartState(final NFA<T> nfa, final State<T> state) {
 		Preconditions.checkArgument(state.isStart());
-		return new ComputationState<>(state, null, null, -1L, new DeweyNumber(1), -1L);
+		return new ComputationState<>(nfa, state, null, null, -1L, new DeweyNumber(1), -1L);
 	}
 
-	public static <T> ComputationState<T> createStartState(final State<T> state, final DeweyNumber version) {
+	public static <T> ComputationState<T> createStartState(final NFA<T> nfa, final State<T> state, final DeweyNumber version) {
 		Preconditions.checkArgument(state.isStart());
-		return new ComputationState<>(state, null, null, -1L, version, -1L);
+		return new ComputationState<>(nfa, state, null, null, -1L, version, -1L);
 	}
 
 	public static <T> ComputationState<T> createState(
+			final NFA<T> nfa,
 			final State<T> currentState,
 			final State<T> previousState,
 			final T event,
 			final long timestamp,
 			final DeweyNumber version,
 			final long startTimestamp) {
-		return new ComputationState<>(currentState, previousState, event, timestamp, version, startTimestamp);
+		return new ComputationState<>(nfa, currentState, previousState, event, timestamp, version, startTimestamp);
+	}
+
+	/**
+	 * The context used when evaluating this computation state.
+	 */
+	public class ConditionContext implements IterativeCondition.Context<T> {
+
+		private static final long serialVersionUID = -6733978464782277795L;
+
+		/**
+		 * A flag indicating if we should recompute the matching pattern, so that
+		 * the {@link IterativeCondition iterative condition} can be evaluated.
+		 */
+		private boolean shouldUpdate;
+
+		/** The current computation state. */
+		private transient ComputationState<T> computationState;
+
+		/** The owning {@link NFA} of this computation state. */
+		private final NFA<T> nfa;
+
+		/**
+		 * The matched pattern so far. A condition will be evaluated over this
+		 * pattern. This is evaluated <b>only once</b>, as this is an expensive
+		 * operation that traverses a path in the {@link SharedBuffer}.
+		 */
+		private transient Map<String, List<T>> matchedEvents;
+
+		public ConditionContext(NFA<T> nfa, ComputationState<T> computationState) {
+			this.nfa = nfa;
+			this.computationState = computationState;
+			this.shouldUpdate = true;
+		}
+
+		@Override
+		public Iterable<T> getEventsForPattern(final String key) {
+			Preconditions.checkNotNull(key);
+
+			// the (partially) matched pattern is computed lazily when this method is called.
+			// this is to avoid any overheads when using a simple, non-iterative condition.
+
+			if (shouldUpdate) {
+				this.matchedEvents = nfa.extractCurrentMatches(computationState);
+				shouldUpdate = false;
+			}
+
+			return new Iterable<T>() {
+				@Override
+				public Iterator<T> iterator() {
+					List<T> elements = matchedEvents.get(key);
+					return elements == null
+							? Collections.EMPTY_LIST.<T>iterator()
+							: elements.iterator();
+				}
+			};
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index ab03566..cddc1ed 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -21,18 +21,19 @@ package org.apache.flink.cep.nfa;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.LinkedHashMultimap;
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 import java.io.ByteArrayInputStream;
@@ -140,10 +141,9 @@ public class NFA<T> implements Serializable {
 		this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer);
 		this.windowTime = windowTime;
 		this.handleTimeout = handleTimeout;
-		stringSharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
-		computationStates = new LinkedList<>();
-
-		states = new HashSet<>();
+		this.stringSharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
+		this.computationStates = new LinkedList<>();
+		this.states = new HashSet<>();
 	}
 
 	public Set<State<T>> getStates() {
@@ -160,7 +160,7 @@ public class NFA<T> implements Serializable {
 		states.add(state);
 
 		if (state.isStart()) {
-			computationStates.add(ComputationState.createStartState(state));
+			computationStates.add(ComputationState.createStartState(this, state));
 		}
 	}
 
@@ -214,10 +214,6 @@ public class NFA<T> implements Serializable {
 						computationState.getPreviousState().getName(),
 						computationState.getEvent(),
 						computationState.getTimestamp());
-				stringSharedBuffer.remove(
-						computationState.getPreviousState().getName(),
-						computationState.getEvent(),
-						computationState.getTimestamp());
 
 				newComputationStates = Collections.emptyList();
 			} else if (event != null) {
@@ -233,8 +229,10 @@ public class NFA<T> implements Serializable {
 					result.addAll(matches);
 
 					// remove found patterns because they are no longer needed
-					stringSharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
-					stringSharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
+					stringSharedBuffer.release(
+							newComputationState.getPreviousState().getName(),
+							newComputationState.getEvent(),
+							newComputationState.getTimestamp());
 				} else {
 					// add new computation state; it will be processed once the next event arrives
 					computationStates.add(newComputationState);
@@ -332,23 +330,29 @@ public class NFA<T> implements Serializable {
 	/**
 	 * Computes the next computation states based on the given computation state, the current event,
 	 * its timestamp and the internal state machine. The algorithm is:
-	 *
-	 * 1. Decide on valid transitions and number of branching paths. See {@link OutgoingEdges}
-	 * 2. Perform transitions:
-	 *      a) IGNORE (links in {@link SharedBuffer} will still point to the previous event)
-	 *          - do not perform for Start State - special case
-	 *          - if stays in the same state increase the current stage for future use with number of
-	 *            outgoing edges
-	 *          - if after PROCEED increase current stage and add new stage (as we change the state)
-	 *          - lock the entry in {@link SharedBuffer} as it is needed in the created branch
-	 *      b) TAKE (links in {@link SharedBuffer} will point to the current event)
-	 *          - add entry to the shared buffer with version of the current computation state
-	 *          - add stage and then increase with number of takes for the future computation states
-	 *          - peek to the next state if it has PROCEED path to a Final State, if true create
-	 *            Final ComputationState to emit results
-	 * 3. Handle the Start State, as it always have to remain
-	 * 4. Release the corresponding entries in {@link SharedBuffer}.
-	 *
+	 *<ol>
+	 *     <li>Decide on valid transitions and number of branching paths. See {@link OutgoingEdges}</li>
+	 * 	   <li>Perform transitions:
+	 * 	   	<ol>
+	 *          <li>IGNORE (links in {@link SharedBuffer} will still point to the previous event)</li>
+	 *          <ul>
+	 *              <li>do not perform for Start State - special case</li>
+	 *          	<li>if stays in the same state increase the current stage for future use with number of outgoing edges</li>
+	 *          	<li>if after PROCEED increase current stage and add new stage (as we change the state)</li>
+	 *          	<li>lock the entry in {@link SharedBuffer} as it is needed in the created branch</li>
+	 *      	</ul>
+	 *      	<li>TAKE (links in {@link SharedBuffer} will point to the current event)</li>
+	 *          <ul>
+	 *              <li>add entry to the shared buffer with version of the current computation state</li>
+	 *              <li>add stage and then increase with number of takes for the future computation states</li>
+	 *              <li>peek to the next state if it has PROCEED path to a Final State, if true create Final
+	 *              ComputationState to emit results</li>
+	 *          </ul>
+	 *      </ol>
+	 *     </li>
+	 * 	   <li>Handle the Start State, as it always have to remain </li>
+	 *     <li>Release the corresponding entries in {@link SharedBuffer}.</li>
+	 *</ol>
 	 *
 	 * @param computationState Current computation state
 	 * @param event Current event which is processed
@@ -387,85 +391,81 @@ public class NFA<T> implements Serializable {
 							ignoreBranchesToVisit--;
 						}
 
-						resultingComputationStates.add(
-							ComputationState.createState(
+						addComputationState(
+								resultingComputationStates,
 								edge.getTargetState(),
 								computationState.getPreviousState(),
 								computationState.getEvent(),
 								computationState.getTimestamp(),
 								version,
 								computationState.getStartTimestamp()
-							)
 						);
-						stringSharedBuffer.lock(
-							computationState.getPreviousState().getName(),
-							computationState.getEvent(),
-							computationState.getTimestamp());
 					}
 				}
 				break;
 				case TAKE:
-					final State<T> newState = edge.getTargetState();
-					final State<T> consumingState = edge.getSourceState();
-					final State<T> previousEventState = computationState.getPreviousState();
+					final State<T> nextState = edge.getTargetState();
+					final State<T> currentState = edge.getSourceState();
+					final State<T> previousState = computationState.getPreviousState();
 
 					final T previousEvent = computationState.getEvent();
-					final DeweyNumber currentVersion = computationState.getVersion();
 
-					final DeweyNumber newComputationStateVersion = new DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
+					final DeweyNumber currentVersion = computationState.getVersion();
+					final DeweyNumber nextVersion = new DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
 					takeBranchesToVisit--;
 
 					final long startTimestamp;
 					if (computationState.isStartState()) {
 						startTimestamp = timestamp;
 						stringSharedBuffer.put(
-							consumingState.getName(),
+							currentState.getName(),
 							event,
 							timestamp,
 							currentVersion);
 					} else {
 						startTimestamp = computationState.getStartTimestamp();
 						stringSharedBuffer.put(
-							consumingState.getName(),
+							currentState.getName(),
 							event,
 							timestamp,
-							previousEventState.getName(),
+							previousState.getName(),
 							previousEvent,
 							computationState.getTimestamp(),
 							currentVersion);
 					}
 
-					// a new computation state is referring to the shared entry
-					stringSharedBuffer.lock(consumingState.getName(), event, timestamp);
-
-					resultingComputationStates.add(ComputationState.createState(
-						newState,
-						consumingState,
-						event,
-						timestamp,
-						newComputationStateVersion,
-						startTimestamp
-					));
+					addComputationState(
+							resultingComputationStates,
+							nextState,
+							currentState,
+							event,
+							timestamp,
+							nextVersion,
+							startTimestamp);
 
 					//check if newly created state is optional (have a PROCEED path to Final state)
-					final State<T> finalState = findFinalStateAfterProceed(newState, event);
+					final State<T> finalState = findFinalStateAfterProceed(nextState, event, computationState);
 					if (finalState != null) {
-						stringSharedBuffer.lock(consumingState.getName(), event, timestamp);
-						resultingComputationStates.add(ComputationState.createState(
-							finalState,
-							consumingState,
-							event,
-							timestamp,
-							newComputationStateVersion,
-							startTimestamp));
+						addComputationState(
+								resultingComputationStates,
+								finalState,
+								currentState,
+								event,
+								timestamp,
+								nextVersion,
+								startTimestamp);
 					}
 					break;
 			}
 		}
 
 		if (computationState.isStartState()) {
-			final int totalBranches = calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(), outgoingEdges.getTotalTakeBranches());
-			final ComputationState<T> startState = createStartState(computationState, totalBranches);
+			int totalBranches = calculateIncreasingSelfState(
+					outgoingEdges.getTotalIgnoreBranches(),
+					outgoingEdges.getTotalTakeBranches());
+
+			DeweyNumber startVersion = computationState.getVersion().increase(totalBranches);
+			ComputationState<T> startState = ComputationState.createStartState(this, computationState.getState(), startVersion);
 			resultingComputationStates.add(startState);
 		}
 
@@ -475,17 +475,26 @@ public class NFA<T> implements Serializable {
 				computationState.getPreviousState().getName(),
 				computationState.getEvent(),
 				computationState.getTimestamp());
-			// try to remove unnecessary shared buffer entries
-			stringSharedBuffer.remove(
-				computationState.getPreviousState().getName(),
-				computationState.getEvent(),
-				computationState.getTimestamp());
 		}
 
 		return resultingComputationStates;
 	}
 
-	private State<T> findFinalStateAfterProceed(State<T> state, T event) {
+	private void addComputationState(
+			List<ComputationState<T>> computationStates,
+			State<T> currentState,
+			State<T> previousState,
+			T event,
+			long timestamp,
+			DeweyNumber version,
+			long startTimestamp) {
+		ComputationState<T> computationState = ComputationState.createState(
+				this, currentState, previousState, event, timestamp, version, startTimestamp);
+		computationStates.add(computationState);
+		stringSharedBuffer.lock(previousState.getName(), event, timestamp);
+	}
+
+	private State<T> findFinalStateAfterProceed(State<T> state, T event, ComputationState<T> computationState) {
 		final Stack<State<T>> statesToCheck = new Stack<>();
 		statesToCheck.push(state);
 
@@ -494,7 +503,7 @@ public class NFA<T> implements Serializable {
 				final State<T> currentState = statesToCheck.pop();
 				for (StateTransition<T> transition : currentState.getStateTransitions()) {
 					if (transition.getAction() == StateTransitionAction.PROCEED &&
-						checkFilterCondition(transition.getCondition(), event)) {
+							checkFilterCondition(computationState, transition.getCondition(), event)) {
 						if (transition.getTargetState().isFinal()) {
 							return transition.getTargetState();
 						} else {
@@ -514,15 +523,12 @@ public class NFA<T> implements Serializable {
 		return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + 1;
 	}
 
-	private ComputationState<T> createStartState(final ComputationState<T> computationState, final int totalBranches) {
-		final DeweyNumber startVersion = computationState.getVersion().increase(totalBranches);
-		return ComputationState.createStartState(computationState.getState(), startVersion);
-	}
-
 	private OutgoingEdges<T> createDecisionGraph(ComputationState<T> computationState, T event) {
+		final OutgoingEdges<T> outgoingEdges = new OutgoingEdges<>(computationState.getState());
+
 		final Stack<State<T>> states = new Stack<>();
 		states.push(computationState.getState());
-		final OutgoingEdges<T> outgoingEdges = new OutgoingEdges<>(computationState.getState());
+
 		//First create all outgoing edges, so to be able to reason about the Dewey version
 		while (!states.isEmpty()) {
 			State<T> currentState = states.pop();
@@ -531,7 +537,7 @@ public class NFA<T> implements Serializable {
 			// check all state transitions for each state
 			for (StateTransition<T> stateTransition : stateTransitions) {
 				try {
-					if (checkFilterCondition(stateTransition.getCondition(), event)) {
+					if (checkFilterCondition(computationState, stateTransition.getCondition(), event)) {
 						// filter condition is true
 						switch (stateTransition.getAction()) {
 							case PROCEED:
@@ -553,9 +559,38 @@ public class NFA<T> implements Serializable {
 		return outgoingEdges;
 	}
 
+	private boolean checkFilterCondition(ComputationState<T> computationState, IterativeCondition<T> condition, T event) throws Exception {
+		return condition == null || condition.filter(event, computationState.getConditionContext());
+	}
+
+	Map<String, List<T>> extractCurrentMatches(final ComputationState<T> computationState) {
+		if (computationState.getPreviousState() == null) {
+			return new HashMap<>();
+		}
+
+		Collection<LinkedHashMultimap<String, T>> paths = stringSharedBuffer.extractPatterns(
+				computationState.getPreviousState().getName(),
+				computationState.getEvent(),
+				computationState.getTimestamp(),
+				computationState.getVersion());
 
-	private boolean checkFilterCondition(FilterFunction<T> condition, T event) throws Exception {
-		return condition == null || condition.filter(event);
+		// for a given computation state, we cannot have more than one matching patterns.
+		Preconditions.checkArgument(paths.size() <= 1);
+
+		TypeSerializer<T> serializer = nonDuplicatingTypeSerializer.getTypeSerializer();
+
+		Map<String, List<T>> result = new HashMap<>();
+		for (LinkedHashMultimap<String, T> path: paths) {
+			for (String key: path.keySet()) {
+				Set<T> events = path.get(key);
+				List<T> values = new ArrayList<>(events.size());
+				for (T event: events) {
+					values.add(serializer.isImmutableType() ? event : serializer.copy(event));
+				}
+				result.put(key, values);
+			}
+		}
+		return result;
 	}
 
 	/**
@@ -573,6 +608,9 @@ public class NFA<T> implements Serializable {
 			computationState.getTimestamp(),
 			computationState.getVersion());
 
+		// for a given computation state, we cannot have more than one matching patterns.
+		Preconditions.checkArgument(paths.size() <= 1);
+
 		List<Map<String, T>> result = new ArrayList<>();
 
 		TypeSerializer<T> serializer = nonDuplicatingTypeSerializer.getTypeSerializer();
@@ -601,6 +639,28 @@ public class NFA<T> implements Serializable {
 		return result;
 	}
 
+	/**
+	 * Generates a state name from a given name template and an index.
+	 * <p>
+	 * If the template ends with "[]" the index is inserted in between the square brackets.
+	 * Otherwise, an underscore and the index is appended to the name.
+	 *
+	 * @param name Name template
+	 * @param index Index of the state
+	 * @return Generated state name from the given state name template
+	 */
+	static String generateStateName(final String name, final int index) {
+		Matcher matcher = namePattern.matcher(name);
+
+		if (matcher.matches()) {
+			return matcher.group(1) + index + matcher.group(2);
+		} else {
+			return name + "_" + index;
+		}
+	}
+
+	//////////////////////			Fault-Tolerance / Migration			//////////////////////
+
 	private void writeObject(ObjectOutputStream oos) throws IOException {
 		oos.defaultWriteObject();
 
@@ -692,6 +752,7 @@ public class NFA<T> implements Serializable {
 				final State<T> previousState = convertedStates.get(previousName);
 
 				computationStates.add(ComputationState.createState(
+					this,
 					convertedStates.get(currentName),
 					previousState,
 					readState.getEvent(),
@@ -710,6 +771,7 @@ public class NFA<T> implements Serializable {
 		}).getName();
 
 		computationStates.add(ComputationState.createStartState(
+			this,
 			convertedStates.get(startName),
 			new DeweyNumber(this.startEventCounter)));
 
@@ -761,32 +823,13 @@ public class NFA<T> implements Serializable {
 			event = null;
 		}
 
-		return ComputationState.createState(state, previousState, event, timestamp, version, startTimestamp);
+		return ComputationState.createState(this, state, previousState, event, timestamp, version, startTimestamp);
 	}
 
+	//////////////////////			Serialization			//////////////////////
 
 	/**
-	 * Generates a state name from a given name template and an index.
-	 * <p>
-	 * If the template ends with "[]" the index is inserted in between the square brackets.
-	 * Otherwise, an underscore and the index is appended to the name.
-	 *
-	 * @param name Name template
-	 * @param index Index of the state
-	 * @return Generated state name from the given state name template
-	 */
-	static String generateStateName(final String name, final int index) {
-		Matcher matcher = namePattern.matcher(name);
-
-		if (matcher.matches()) {
-			return matcher.group(1) + index + matcher.group(2);
-		} else {
-			return name + "_" + index;
-		}
-	}
-
-	/**
-	 * {@link TypeSerializer} for {@link NFA} that uses Java Serialization.
+	 * A {@link TypeSerializer} for {@link NFA} that uses Java Serialization.
 	 */
 	public static class Serializer<T> extends TypeSerializer<NFA<T>> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index d5b7876..ccc6884 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -52,12 +52,14 @@ import java.util.Stack;
  *
  * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
  *
- * @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
+ * @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">
+ *     https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
  *
  * @param <K> Type of the keys
  * @param <V> Type of the values
  */
 public class SharedBuffer<K extends Serializable, V> implements Serializable {
+
 	private static final long serialVersionUID = 9213251042562206495L;
 
 	private final TypeSerializer<V> valueSerializer;
@@ -66,20 +68,20 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 
 	public SharedBuffer(final TypeSerializer<V> valueSerializer) {
 		this.valueSerializer = valueSerializer;
-		pages = new HashMap<>();
+		this.pages = new HashMap<>();
 	}
 
 	/**
 	 * Stores given value (value + timestamp) under the given key. It assigns a preceding element
 	 * relation to the entry which is defined by the previous key, value (value + timestamp).
 	 *
-	 * @param key Key of the current value
-	 * @param value Current value
-	 * @param timestamp Timestamp of the current value (a value requires always a timestamp to make it uniquely referable))
-	 * @param previousKey Key of the value for the previous relation
-	 * @param previousValue Value for the previous relation
+	 * @param key               Key of the current value
+	 * @param value             Current value
+	 * @param timestamp         Timestamp of the current value (a value requires always a timestamp to make it uniquely referable))
+	 * @param previousKey       Key of the value for the previous relation
+	 * @param previousValue     Value for the previous relation
 	 * @param previousTimestamp Timestamp of the value for the previous relation
-	 * @param version Version of the previous relation
+	 * @param version           Version of the previous relation
 	 */
 	public void put(
 			final K key,
@@ -89,14 +91,6 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 			final V previousValue,
 			final long previousTimestamp,
 			final DeweyNumber version) {
-		SharedBufferPage<K, V> page;
-
-		if (!pages.containsKey(key)) {
-			page = new SharedBufferPage<K, V>(key);
-			pages.put(key, page);
-		} else {
-			page = pages.get(key);
-		}
 
 		final SharedBufferEntry<K, V> previousSharedBufferEntry = get(previousKey, previousValue, previousTimestamp);
 
@@ -108,55 +102,41 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 				"relation has been already pruned, even though you expect it to be still there.");
 		}
 
-		page.add(
-			new ValueTimeWrapper<>(value, timestamp),
-			previousSharedBufferEntry,
-			version);
+		put(key, value, timestamp, previousSharedBufferEntry, version);
 	}
 
 	/**
 	 * Stores given value (value + timestamp) under the given key. It assigns no preceding element
 	 * relation to the entry.
 	 *
-	 * @param key Key of the current value
-	 * @param value Current value
+	 * @param key       Key of the current value
+	 * @param value     Current value
 	 * @param timestamp Timestamp of the current value (a value requires always a timestamp to make it uniquely referable))
-	 * @param version Version of the previous relation
+	 * @param version   Version of the previous relation
 	 */
 	public void put(
-		final K key,
-		final V value,
-		final long timestamp,
-		final DeweyNumber version) {
-		SharedBufferPage<K, V> page;
-
-		if (!pages.containsKey(key)) {
-			page = new SharedBufferPage<K, V>(key);
-			pages.put(key, page);
-		} else {
-			page = pages.get(key);
-		}
+			final K key,
+			final V value,
+			final long timestamp,
+			final DeweyNumber version) {
 
-		page.add(
-			new ValueTimeWrapper<>(value, timestamp),
-			null,
-			version);
+		put(key, value, timestamp, null, version);
 	}
 
-	/**
-	 * Checks whether the given key, value, timestamp triple is contained in the shared buffer
-	 *
-	 * @param key Key of the value
-	 * @param value Value
-	 * @param timestamp Timestamp of the value
-	 * @return Whether a value with the given timestamp is registered under the given key
-	 */
-	public boolean contains(
-		final K key,
-		final V value,
-		final long timestamp) {
+	private void put(
+			final K key,
+			final V value,
+			final long timestamp,
+			final SharedBufferEntry<K, V> previousSharedBufferEntry,
+			final DeweyNumber version) {
 
-		return pages.containsKey(key) && pages.get(key).contains(new ValueTimeWrapper<>(value, timestamp));
+		SharedBufferPage<K, V> page = pages.get(key);
+		if (page == null) {
+			page = new SharedBufferPage<>(key);
+			pages.put(key, page);
+		}
+
+		page.add(new ValueTimeWrapper<>(value, timestamp), previousSharedBufferEntry, version);
 	}
 
 	public boolean isEmpty() {
@@ -272,47 +252,29 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 	 * Increases the reference counter for the given value, key, timestamp entry so that it is not
 	 * accidentally removed.
 	 *
-	 * @param key Key of the value to lock
-	 * @param value Value to lock
+	 * @param key       Key of the value to lock
+	 * @param value     Value to lock
 	 * @param timestamp Timestamp of the value to lock
 	 */
 	public void lock(final K key, final V value, final long timestamp) {
 		SharedBufferEntry<K, V> entry = get(key, value, timestamp);
-
 		if (entry != null) {
 			entry.increaseReferenceCounter();
 		}
 	}
 
 	/**
-	 * Decreases the reference counter for the given value, key, timstamp entry so that it can be
+	 * Decreases the reference counter for the given value, key, timestamp entry so that it can be
 	 * removed once the reference counter reaches 0.
 	 *
-	 * @param key Key of the value to release
-	 * @param value Value to release
+	 * @param key       Key of the value to release
+	 * @param value     Value to release
 	 * @param timestamp Timestamp of the value to release
 	 */
 	public void release(final K key, final V value, final long timestamp) {
 		SharedBufferEntry<K, V> entry = get(key, value, timestamp);
-
-		if (entry != null ) {
-			entry.decreaseReferenceCounter();
-		}
-	}
-
-	/**
-	 * Removes the given value, key, timestamp entry if its reference counter is 0. It will also
-	 * release the next element in its previous relation and apply remove to this element
-	 * recursively.
-	 *
-	 * @param key Key of the value to remove
-	 * @param value Value to remove
-	 * @param timestamp Timestamp of the value to remvoe
-	 */
-	public void remove(final K key, final V value, final long timestamp) {
-		SharedBufferEntry<K, V> entry = get(key, value, timestamp);
-
 		if (entry != null) {
+			entry.decreaseReferenceCounter();
 			internalRemove(entry);
 		}
 	}
@@ -626,10 +588,6 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 			sharedBufferEntry.addEdge(newEdge);
 		}
 
-		public boolean contains(final ValueTimeWrapper<V> valueTime) {
-			return entries.containsKey(valueTime);
-		}
-
 		public SharedBufferEntry<K, V> get(final ValueTimeWrapper<V> valueTime) {
 			return entries.get(valueTime);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 27e0dcd..c673576 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.cep.nfa;
 
-import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -66,29 +66,29 @@ public class State<T> implements Serializable {
 	}
 
 	private void addStateTransition(
-		final StateTransitionAction action,
-		final State<T> targetState,
-		final FilterFunction<T> condition) {
+			final StateTransitionAction action,
+			final State<T> targetState,
+			final IterativeCondition<T> condition) {
 		stateTransitions.add(new StateTransition<T>(this, action, targetState, condition));
 	}
 
-	public void addIgnore(final FilterFunction<T> condition) {
+	public void addIgnore(final IterativeCondition<T> condition) {
 		addStateTransition(StateTransitionAction.IGNORE, this, condition);
 	}
 
-	public void addIgnore(final State<T> targetState,final FilterFunction<T> condition) {
+	public void addIgnore(final State<T> targetState,final IterativeCondition<T> condition) {
 		addStateTransition(StateTransitionAction.IGNORE, targetState, condition);
 	}
 
-	public void addTake(final State<T> targetState, final FilterFunction<T> condition) {
+	public void addTake(final State<T> targetState, final IterativeCondition<T> condition) {
 		addStateTransition(StateTransitionAction.TAKE, targetState, condition);
 	}
 
-	public void addProceed(final State<T> targetState, final FilterFunction<T> condition) {
+	public void addProceed(final State<T> targetState, final IterativeCondition<T> condition) {
 		addStateTransition(StateTransitionAction.PROCEED, targetState, condition);
 	}
 
-	public void addTake(final FilterFunction<T> condition) {
+	public void addTake(final IterativeCondition<T> condition) {
 		addStateTransition(StateTransitionAction.TAKE, this, condition);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
index e3c7b7a..f80edfc 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
@@ -19,6 +19,8 @@
 package org.apache.flink.cep.nfa;
 
 import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 
 import java.io.Serializable;
 import java.util.Objects;
@@ -29,17 +31,24 @@ public class StateTransition<T> implements Serializable {
 	private final StateTransitionAction action;
 	private final State<T> sourceState;
 	private final State<T> targetState;
-	private final FilterFunction<T> condition;
+	private IterativeCondition<T> newCondition;
+
+	/**
+	 * @deprecated 	This field remains for backwards compatibility.
+	 * Now the conditions extend the {@link IterativeCondition}.
+	 */
+	@Deprecated
+	private FilterFunction<T> condition;
 
 	public StateTransition(
-		final State<T> sourceState,
-		final StateTransitionAction action,
-		final State<T> targetState,
-		final FilterFunction<T> condition) {
+			final State<T> sourceState,
+			final StateTransitionAction action,
+			final State<T> targetState,
+			final IterativeCondition<T> condition) {
 		this.action = action;
 		this.targetState = targetState;
 		this.sourceState = sourceState;
-		this.condition = condition;
+		this.newCondition = condition;
 	}
 
 	public StateTransitionAction getAction() {
@@ -54,8 +63,12 @@ public class StateTransition<T> implements Serializable {
 		return sourceState;
 	}
 
-	public FilterFunction<T> getCondition() {
-		return condition;
+	public IterativeCondition<T> getCondition() {
+		if (condition != null) {
+			this.newCondition = new FilterWrapper<>(condition);
+			this.condition = null;
+		}
+		return newCondition;
 	}
 
 	@Override
@@ -87,7 +100,7 @@ public class StateTransition<T> implements Serializable {
 			.append(sourceState.getName()).append(", ")
 			.append(targetState.getName());
 
-		if (condition != null) {
+		if (newCondition != null) {
 			builder.append(", with filter)");
 		} else {
 			builder.append(")");
@@ -95,4 +108,24 @@ public class StateTransition<T> implements Serializable {
 
 		return builder.toString();
 	}
+
+	/**
+	 * A wrapper to transform a {@link FilterFunction} into a {@link SimpleCondition}.
+	 * This is used only when migrating from an older Flink version.
+	 */
+	private static class FilterWrapper<T> extends SimpleCondition<T> {
+
+		private static final long serialVersionUID = -4973016745698940430L;
+
+		private final FilterFunction<T> filterFunction;
+
+		FilterWrapper(FilterFunction<T> filterFunction) {
+			this.filterFunction = filterFunction;
+		}
+
+		@Override
+		public boolean filter(T value) throws Exception {
+			return filterFunction.filter(value);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 8bd8612..4fb918f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -21,19 +21,19 @@ package org.apache.flink.cep.nfa.compiler;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
 import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.StateTransitionAction;
-import org.apache.flink.cep.pattern.FilterFunctions;
+import org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.FollowedByPattern;
 import org.apache.flink.cep.pattern.MalformedPatternException;
-import org.apache.flink.cep.pattern.NotFilterFunction;
+import org.apache.flink.cep.pattern.conditions.NotCondition;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.Quantifier;
 import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
 import javax.annotation.Nullable;
@@ -240,7 +240,7 @@ public class NFACompiler {
 
 		/**
 		 * Converts the given state into a "complex" state consisting of given number of states with
-		 * same {@link FilterFunction}
+		 * same {@link IterativeCondition}
 		 *
 		 * @param sourceState the state to be converted
 		 * @param sinkState the state that the converted state should point to
@@ -271,8 +271,9 @@ public class NFACompiler {
 		@SuppressWarnings("unchecked")
 		private void convertToSingletonState(final State<T> sourceState, final State<T> sinkState) {
 
-			final FilterFunction<T> currentFilterFunction = (FilterFunction<T>) currentPattern.getFilterFunction();
-			final FilterFunction<T> trueFunction = FilterFunctions.trueFunction();
+			final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition();
+			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
+
 			sourceState.addTake(sinkState, currentFilterFunction);
 
 			if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) {
@@ -303,16 +304,13 @@ public class NFACompiler {
 		@SuppressWarnings("unchecked")
 		private State<T> createFirstMandatoryStateOfLoop(final State<T> sinkState, final State.StateType stateType) {
 
-			final FilterFunction<T> currentFilterFunction = (FilterFunction<T>) currentPattern.getFilterFunction();
+			final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition();
 			final State<T> firstState = new State<>(currentPattern.getName(), stateType);
 
 			firstState.addTake(sinkState, currentFilterFunction);
 			if (currentPattern instanceof FollowedByPattern) {
-				if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)) {
-					firstState.addIgnore(new NotFilterFunction<>(currentFilterFunction));
-				} else {
-					firstState.addIgnore(FilterFunctions.<T>trueFunction());
-				}
+				final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
+				firstState.addIgnore(ignoreCondition);
 			}
 			return firstState;
 		}
@@ -332,8 +330,8 @@ public class NFACompiler {
 		@SuppressWarnings("unchecked")
 		private void convertToLooping(final State<T> sourceState, final State<T> sinkState, boolean isFirstState) {
 
-			final FilterFunction<T> filterFunction = (FilterFunction<T>) currentPattern.getFilterFunction();
-			final FilterFunction<T> trueFunction = FilterFunctions.<T>trueFunction();
+			final IterativeCondition<T> filterFunction = (IterativeCondition<T>) currentPattern.getCondition();
+			final IterativeCondition<T> trueFunction = BooleanConditions.<T>trueFunction();
 
 			sourceState.addProceed(sinkState, trueFunction);
 			sourceState.addTake(filterFunction);
@@ -342,13 +340,7 @@ public class NFACompiler {
 					currentPattern.getName(),
 					State.StateType.Normal);
 
-
-				final FilterFunction<T> ignoreCondition;
-				if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)) {
-					ignoreCondition = new NotFilterFunction<>(filterFunction);
-				} else {
-					ignoreCondition = trueFunction;
-				}
+				final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
 
 				sourceState.addIgnore(ignoreState, ignoreCondition);
 				ignoreState.addTake(sourceState, filterFunction);
@@ -368,6 +360,19 @@ public class NFACompiler {
 		private void convertToLooping(final State<T> sourceState, final State<T> sinkState) {
 			convertToLooping(sourceState, sinkState, false);
 		}
+
+		/**
+		 * @return The {@link IterativeCondition condition} for the {@code IGNORE} edge
+		 * that corresponds to the specified {@link Pattern}. If the pattern is
+		 * {@link QuantifierProperty#EAGER}, the negated user-specified condition is
+		 * returned. In other case, a condition that always evaluated to {@code true} is
+		 * returned.
+		 */
+		private IterativeCondition<T> getIgnoreCondition(Pattern<T, ?> pattern) {
+			return pattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)
+					? new NotCondition<>((IterativeCondition<T>) pattern.getCondition())
+					: BooleanConditions.<T>trueFunction();
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
index ecaee07..a7391d5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
@@ -21,11 +21,15 @@ package org.apache.flink.cep.pattern;
 import org.apache.flink.api.common.functions.FilterFunction;
 
 /**
- * A filter function which combines two filter functions with a logical and. Thus, the filter
+ * @deprecated This is only used when migrating from an older Flink version.
+ * Use the {@link org.apache.flink.cep.pattern.conditions.AndCondition} instead.
+ *
+ * <p>A filter function which combines two filter functions with a logical and. Thus, the filter
  * function only returns true, iff both filters return true.
  *
  * @param <T> Type of the element to filter
  */
+@Deprecated
 public class AndFilterFunction<T> implements FilterFunction<T> {
 	private static final long serialVersionUID = -2109562093871155005L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
deleted file mode 100644
index 12e58ba..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.cep.pattern;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-
-public class FilterFunctions<T> {
-
-	private FilterFunctions() {
-	}
-
-	public static <T> FilterFunction<T> trueFunction()  {
-		return new FilterFunction<T>() {
-			@Override
-			public boolean filter(T value) throws Exception {
-				return true;
-			}
-		};
-	}
-
-	public static <T> FilterFunction<T> falseFunction()  {
-		return new FilterFunction<T>() {
-			@Override
-			public boolean filter(T value) throws Exception {
-				return false;
-			}
-		};
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
deleted file mode 100644
index a4fc8f5..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.pattern;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * A filter function which negates filter function.
- *
- * @param <T> Type of the element to filter
- */
-public class NotFilterFunction<T> implements FilterFunction<T> {
-	private static final long serialVersionUID = -2109562093871155005L;
-
-	private final FilterFunction<T> original;
-
-	public NotFilterFunction(final FilterFunction<T> original) {
-		this.original = original;
-	}
-
-	@Override
-	public boolean filter(T value) throws Exception {
-		return !original.filter(value);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
index c42ecb1..3620cae 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
@@ -21,11 +21,15 @@ package org.apache.flink.cep.pattern;
 import org.apache.flink.api.common.functions.FilterFunction;
 
 /**
- * A filter function which combines two filter functions with a logical or. Thus, the filter
+ * @deprecated This is only used when migrating from an older Flink version.
+ * Use the {@link org.apache.flink.cep.pattern.conditions.OrCondition} instead.
+ *
+ * <p>A filter function which combines two filter functions with a logical or. Thus, the filter
  * function only returns true, iff at least one of the filter functions holds true.
  *
  * @param <T> Type of the element to filter
  */
+@Deprecated
 public class OrFilterFunction<T> implements FilterFunction<T> {
 	private static final long serialVersionUID = -2109562093871155005L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 7b4d9c7..cd51788 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.cep.pattern;
 
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.pattern.conditions.AndCondition;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.pattern.conditions.OrCondition;
+import org.apache.flink.cep.pattern.conditions.SubtypeCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Preconditions;
 
@@ -49,7 +52,7 @@ public class Pattern<T, F extends T> {
 	private final Pattern<T, ? extends T> previous;
 
 	// filter condition for an event to be matched
-	private FilterFunction<F> filterFunction;
+	private IterativeCondition<F> condition;
 
 	// window length in which the pattern match has to occur
 	private Time windowTime;
@@ -71,8 +74,8 @@ public class Pattern<T, F extends T> {
 		return previous;
 	}
 
-	public FilterFunction<F> getFilterFunction() {
-		return filterFunction;
+	public IterativeCondition<F> getCondition() {
+		return condition;
 	}
 
 	public Time getWindowTime() {
@@ -90,36 +93,34 @@ public class Pattern<T, F extends T> {
 	/**
 	 * Specifies a filter condition which has to be fulfilled by an event in order to be matched.
 	 *
-	 * @param newFilterFunction Filter condition
+	 * @param condition Filter condition
 	 * @return The same pattern operator where the new filter condition is set
 	 */
-	public Pattern<T, F> where(FilterFunction<F> newFilterFunction) {
-		ClosureCleaner.clean(newFilterFunction, true);
+	public Pattern<T, F> where(IterativeCondition<F> condition) {
+		ClosureCleaner.clean(condition, true);
 
-		if (this.filterFunction == null) {
-			this.filterFunction = newFilterFunction;
+		if (this.condition == null) {
+			this.condition = condition;
 		} else {
-			this.filterFunction = new AndFilterFunction<F>(this.filterFunction, newFilterFunction);
+			this.condition = new AndCondition<>(this.condition, condition);
 		}
-
 		return this;
 	}
 
 	/**
 	 * Specifies a filter condition which is OR'ed with an existing filter function.
 	 *
-	 * @param orFilterFunction OR filter condition
+	 * @param condition OR filter condition
 	 * @return The same pattern operator where the new filter condition is set
 	 */
-	public Pattern<T, F> or(FilterFunction<F> orFilterFunction) {
-		ClosureCleaner.clean(orFilterFunction, true);
+	public Pattern<T, F> or(IterativeCondition<F> condition) {
+		ClosureCleaner.clean(condition, true);
 
-		if (this.filterFunction == null) {
-			this.filterFunction = orFilterFunction;
+		if (this.condition == null) {
+			this.condition = condition;
 		} else {
-			this.filterFunction = new OrFilterFunction<>(this.filterFunction, orFilterFunction);
+			this.condition = new OrCondition<>(this.condition, condition);
 		}
-
 		return this;
 	}
 
@@ -132,10 +133,11 @@ public class Pattern<T, F extends T> {
 	 * @return The same pattern operator with the new subtype constraint
 	 */
 	public <S extends F> Pattern<T, S> subtype(final Class<S> subtypeClass) {
-		if (filterFunction == null) {
-			this.filterFunction = new SubtypeFilterFunction<F>(subtypeClass);
+		if (condition == null) {
+			this.condition = new SubtypeCondition<F>(subtypeClass);
 		} else {
-			this.filterFunction = new AndFilterFunction<F>(this.filterFunction, new SubtypeFilterFunction<F>(subtypeClass));
+			this.condition = new AndCondition<>(this.condition,
+					new SubtypeCondition<F>(subtypeClass));
 		}
 
 		@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
index f183f0f..ae48df3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
@@ -21,11 +21,15 @@ package org.apache.flink.cep.pattern;
 import org.apache.flink.api.common.functions.FilterFunction;
 
 /**
- * A filter function which filters elements of the given type. A element if filtered out iff it
+ * @deprecated This is only used when migrating from an older Flink version.
+ * Use the {@link org.apache.flink.cep.pattern.conditions.SubtypeCondition} instead.
+ *
+ * <p>A filter function which filters elements of the given type. A element if filtered out iff it
  * is not assignable to the given subtype of T.
  *
  * @param <T> Type of the elements to be filtered
  */
+@Deprecated
 public class SubtypeFilterFunction<T> implements FilterFunction<T> {
 	private static final long serialVersionUID = -2990017519957561355L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
new file mode 100644
index 0000000..5df7c66
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+/**
+ * A {@link IterativeCondition condition} which combines two conditions with a logical
+ * {@code AND} and returns {@code true} if both are {@code true}.
+ *
+ * @param <T> Type of the element to filter
+ */
+public class AndCondition<T> extends IterativeCondition<T> {
+
+	private static final long serialVersionUID = -2471892317390197319L;
+
+	private final IterativeCondition<T> left;
+	private final IterativeCondition<T> right;
+
+	public AndCondition(final IterativeCondition<T> left, final IterativeCondition<T> right) {
+		this.left = left;
+		this.right = right;
+	}
+
+	@Override
+	public boolean filter(T value, Context<T> ctx) throws Exception {
+		return left.filter(value, ctx) && right.filter(value, ctx);
+	}
+
+	/**
+	 * @return One of the {@link IterativeCondition conditions} combined in this condition.
+	 */
+	public IterativeCondition<T> getLeft() {
+		return left;
+	}
+
+	/**
+	 * @return One of the {@link IterativeCondition conditions} combined in this condition.
+	 */
+	public IterativeCondition<T> getRight() {
+		return right;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
new file mode 100644
index 0000000..d67b407
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.pattern.conditions;
+
+/**
+ * Utility class containing an {@link IterativeCondition} that always returns
+ * {@code true} and one that always returns {@code false}.
+ */
+public class BooleanConditions {
+
+	/**
+	 * @return An {@link IterativeCondition} that always returns {@code true}.
+	 */
+	public static <T> IterativeCondition<T> trueFunction()  {
+		return new SimpleCondition<T>() {
+			private static final long serialVersionUID = 8379409657655181451L;
+
+			@Override
+			public boolean filter(T value) throws Exception {
+				return true;
+			}
+		};
+	}
+
+	/**
+	 * @return An {@link IterativeCondition} that always returns {@code false}.
+	 */
+	public static <T> IterativeCondition<T> falseFunction()  {
+		return new SimpleCondition<T>() {
+			private static final long serialVersionUID = -823981593720949910L;
+
+			@Override
+			public boolean filter(T value) throws Exception {
+				return false;
+			}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
new file mode 100644
index 0000000..f225e01
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * A user-defined condition that decides if an element should be accepted in the pattern or not.
+ * Accepting an element also signals a state transition for the corresponding {@link org.apache.flink.cep.nfa.NFA}.
+ *
+ * <p>A condition can be a simple filter or a more complex condition that iterates over the previously accepted
+ * elements in the pattern and decides to accept a new element or not based on some statistic over these elements.
+ * In the former case, the condition should extend the {@link SimpleCondition} class. In the later, the condition
+ * should extend this class, which gives you also access to the previously accepted elements through a {@link Context}.
+ *
+ * <p>An iterative condition that accepts an element if i) its name is middle, and ii) the sum of the prices of all
+ * accepted elements is less than {@code 5} would look like:
+ *
+ * <pre>
+ * {@code
+ * private class MyCondition extends IterativeCondition<Event> {
+ *
+ * 		@Override
+ *     	public boolean filter(Event value, Context<Event> ctx) throws Exception {
+ *     		if (!value.getName().equals("middle")) {
+ *     			return false;
+ *     		}
+ *
+ *     		double sum = 0.0;
+ *     		for (Event e: ctx.getEventsForPattern("middle")) {
+ *     			sum += e.getPrice();
+ *     		}
+ *     		sum += value.getPrice();
+ *     		return Double.compare(sum, 5.0) <= 0;
+ *     	}
+ *    }
+ * }
+ * </pre>
+ *
+ * <b>ATTENTION: </b> The call to {@link Context#getEventsForPattern(String) getEventsForPattern(...)} has to find
+ * the elements that belong to the pattern among the elements stored by the NFA. The cost of this operation can vary,
+ * so when implementing your condition, try to minimize the times the method is called.
+ */
+public abstract class IterativeCondition<T> implements Function, Serializable {
+
+	private static final long serialVersionUID = 7067817235759351255L;
+
+	/**
+	 * The filter function that evaluates the predicate.
+	 * <p>
+	 * <strong>IMPORTANT:</strong> The system assumes that the function does not
+	 * modify the elements on which the predicate is applied. Violating this assumption
+	 * can lead to incorrect results.
+	 *
+	 * @param value The value to be tested.
+	 * @param ctx The {@link Context} used for the evaluation of the function and provides access to
+	 *            the already accepted events in the pattern (see {@link Context#getEventsForPattern(String)}).
+	 * @return {@code true} for values that should be retained, {@code false}
+	 * for values to be filtered out.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	public abstract boolean filter(T value, Context<T> ctx) throws Exception;
+
+	/**
+	 * The context used when evaluating the {@link IterativeCondition condition}.
+	 */
+	public interface Context<T> extends Serializable {
+
+		/**
+		 * @return An {@link Iterable} over the already accepted elements
+		 * for a given pattern. Elements are iterated in the order the were
+		 * inserted in the pattern.
+		 *
+		 * @param name The name of the pattern.
+		 */
+		Iterable<T> getEventsForPattern(String name);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
new file mode 100644
index 0000000..3e6ab56
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+/**
+ * A {@link IterativeCondition condition} which negates the condition it wraps
+ * and returns {@code true} if the original condition returns {@code false}.
+ *
+ * @param <T> Type of the element to filter
+ */
+public class NotCondition<T> extends IterativeCondition<T> {
+	private static final long serialVersionUID = -2109562093871155005L;
+
+	private final IterativeCondition<T> original;
+
+	public NotCondition(final IterativeCondition<T> original) {
+		this.original = original;
+	}
+
+	@Override
+	public boolean filter(T value, Context<T> ctx) throws Exception {
+		return !original.filter(value, ctx);
+	}
+}


Mime
View raw message