flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [1/2] flink git commit: [FLINK-6198] [cep] [doc] Update CEP documentation.
Date Mon, 12 Jun 2017 09:29:24 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 d24515fee -> 261bb3826


[FLINK-6198] [cep] [doc] Update CEP documentation.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c35b1daf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c35b1daf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c35b1daf

Branch: refs/heads/release-1.3
Commit: c35b1dafd8cc83d2929e0ebcc140942ab981001e
Parents: d24515f
Author: kkloudas <kkloudas@gmail.com>
Authored: Tue May 23 19:07:42 2017 +0200
Committer: kkloudas <kkloudas@gmail.com>
Committed: Mon Jun 12 11:28:51 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md | 1029 ++++++++++++++++++++++++++++++---------------
 1 file changed, 680 insertions(+), 349 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c35b1daf/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index a5ca8b1..6a2fb53 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -23,22 +23,24 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-FlinkCEP is the complex event processing library for Flink.
-It allows you to easily detect complex event patterns in a stream of endless data.
-Complex events can then be constructed from matching sequences.
-This gives you the opportunity to quickly get hold of what's really important in your data.
+FlinkCEP is the Complex Event Processing (CEP) library implemented on top of Flink.
+It allows you to easily detect event patterns in an endless stream of events, thus
+giving you the opportunity to quickly get hold of what's really important in your 
+data.
 
-<span class="label label-danger">Attention</span> The events in the `DataStream` to which
-you want to apply pattern matching have to implement proper `equals()` and `hashCode()` methods
-because these are used for comparing and matching events.
+This page describes the API calls available in Flink CEP. We start by presenting the [Pattern API](#the-pattern-api), 
+which allows you to specify the patterns that you want to detect in your stream, before presenting how you can 
+[detect and act upon matching event sequences](#detecting-patterns). At the end, we present the assumptions the CEP 
+library makes when [dealing with lateness](#handling-lateness-in-event-time) in event time and how you can 
+[migrate your job](#migrating-from-an-older-flink-version) from an older Flink version to Flink-1.3.
 
 * This will be replaced by the TOC
 {:toc}
 
 ## Getting Started
 
-If you want to jump right in, you have to [set up a Flink program]({{ site.baseurl }}/dev/linking_with_flink.html).
-Next, you have to add the FlinkCEP dependency to the `pom.xml` of your project.
+If you want to jump right in, you have to [set up a Flink program]({{ site.baseurl }}/dev/linking_with_flink.html) and 
+add the FlinkCEP dependency to the `pom.xml` of your project.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -65,21 +67,49 @@ Next, you have to add the FlinkCEP dependency to the `pom.xml` of your project.
 Note that FlinkCEP is currently not part of the binary distribution.
 See linking with it for cluster execution [here]({{site.baseurl}}/dev/linking.html).
 
-Now you can start writing your first CEP program using the pattern API.
+Now you can start writing your first CEP program using the Pattern API.
+
+<span class="label label-danger">Attention</span> The events in the `DataStream` to which
+you want to apply pattern matching have to implement proper `equals()` and `hashCode()` methods
+because these are used for comparing and matching events.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 DataStream<Event> input = ...
 
-Pattern<Event, ?> pattern = Pattern.begin("start").where(evt -> evt.getId() == 42)
-    .next("middle").subtype(SubEvent.class).where(subEvt -> subEvt.getVolume() >= 10.0)
-    .followedBy("end").where(evt -> evt.getName().equals("end"));
+Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
+        new SimpleCondition<Event>() {
+            @Override
+            public boolean filter(Event event) {
+                return event.getId() == 42;
+            }
+        }
+    ).next("middle").subtype(SubEvent.class).where(
+        new SimpleCondition<Event>() {
+            @Override
+            public boolean filter(SubEvent subEvent) {
+                return subEvent.getVolume() >= 10.0;
+            }
+        }
+    ).followedBy("end").where(
+         new SimpleCondition<Event>() {
+            @Override
+            public boolean filter(Event event) {
+                return event.getName().equals("end");
+            }
+         }
+    );
 
 PatternStream<Event> patternStream = CEP.pattern(input, pattern);
 
-DataStream<Alert> result = patternStream.select(pattern -> {
-    return createAlertFrom(pattern);
+DataStream<Alert> result = patternStream.select(
+    new PatternSelectFunction<Event, Alert> {
+        @Override
+        public Alert select(Map<String, List<Event>> pattern) throws Exception {
+            return createAlertFrom(pattern);
+        }
+    }
 });
 {% endhighlight %}
 </div>
@@ -98,48 +128,107 @@ val result: DataStream[Alert] = patternStream.select(createAlert(_))
 </div>
 </div>
 
-Note that we use Java 8 lambdas in our Java code examples to make them more succinct.
-
 ## The Pattern API
 
-The pattern API allows you to quickly define complex event patterns.
-
-Each pattern consists of multiple stages or what we call states.
-In order to go from one state to the next, the user can specify conditions.
-These conditions can be the contiguity of events or a filter condition on an event.
-
-Each pattern has to start with an initial state:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-Pattern<Event, ?> start = Pattern.<Event>begin("start");
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val start : Pattern[Event, _] = Pattern.begin("start")
-{% endhighlight %}
-</div>
-</div>
-
-Each state must have a unique name to identify the matched events later on.
-Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method.
-These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`. 
-
-**Iterative Conditions:** This type of conditions can iterate over the previously accepted elements in the pattern and 
-decide to accept a new element or not, based on some statistic over those elements. 
-
-Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum 
-of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do 
-not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g.
-`oneToMany` or `zeroToMany`.
+The pattern API allows you to quickly define complex pattern sequences that you want to extract 
+from your input stream.
+
+Each such complex pattern sequence consists of multiple simple patterns, i.e. patterns looking for 
+individual events with the same properties. From now on, these simple patterns will be called **patterns**, and
+the final complex pattern sequence we are searching in the stream, **pattern sequence**. A pattern sequence 
+can be seen as a graph of such patterns, where transitions from one pattern to the next occur based on user-specified
+*conditions*, e.g. `event.getName().equals("start")`. A *match* is a sequence of input events which visits all 
+patterns of the complex pattern graph, through a sequence of valid pattern transitions.
+
+<span class="label label-danger">Attention</span> Each pattern must have a unique name to identify the matched 
+events later on. 
+
+<span class="label label-danger">Attention</span> Pattern names **CANNOT** contain the character `":"`.
+
+In the remainder, we start by describing how to define [Patterns](#individual-patterns), before describing how you can 
+combine individual patterns into [Complex Patterns](#combining-patterns).
+
+### Individual Patterns
+
+A **Pattern** can be either a *singleton* pattern, or a *looping* one. Singleton patterns accept a single 
+event, while looping ones can accept more than one. In pattern matching symbols, in the pattern `"a b+ c? d"` (or `"a"`, 
+followed by *one or more* `"b"`'s, optionally followed by a `"c"`, followed by a `"d"`), `a`, `c?`, and `d` are 
+singleton patterns, while `b+` is a looping one. By default, a pattern is a singleton pattern and you can transform 
+it to a looping one using [Quantifiers](#quantifiers). In addition, each pattern can have one or more 
+[Conditions](#conditions) based on which it accepts events.
+
+#### Quantifiers
+
+In FlinkCEP, looping patterns can be specified using the methods: `pattern.oneOrMore()`, for patterns that expect one or
+more occurrences of a given event (e.g. the `b+` mentioned previously), and `pattern.times(#ofTimes)` for patterns that 
+expect a specific number of occurrences of a given type of event, e.g. 4 `a`'s. All patterns, looping or not, can be made 
+optional using the `pattern.optional()` method. For a pattern named `start`, the following are valid quantifiers:
+ 
+ <div class="codetabs" markdown="1">
+ <div data-lang="java" markdown="1">
+ {% highlight java %}
+ // expecting 4 occurrences
+ start.times(4);
+  
+ // expecting 0 or 4 occurrences
+ start.times(4).optional();
+ 
+ // expecting 1 or more occurrences
+ start.oneOrMore();
+   
+ // expecting 0 or more occurrences
+ start.oneOrMore().optional();
+ {% endhighlight %}
+ </div>
+ 
+ <div data-lang="scala" markdown="1">
+ {% highlight scala %}
+ // expecting 4 occurrences
+ start.times(4)
+   
+ // expecting 0 or 4 occurrences
+ start.times(4).optional()
+  
+ // expecting 1 or more occurrences
+ start.oneOrMore()
+    
+ // expecting 0 or more occurrences
+ start.oneOrMore().optional()
+ {% endhighlight %}
+ </div>
+ </div>
+
+#### Conditions
+
+At every pattern, and in order to go from one pattern to the next, you can specify additional **conditions**. 
+These conditions can be related to:
+ 
+ 1. a [property of the incoming event](#conditions-on-properties), e.g. its value should be larger than 5, 
+ or larger than the average value of the previously accepted events.
+
+ 2. the [contiguity of the matching events](#conditions-on-contiguity), e.g. detect pattern `a,b,c` without 
+ non-matching events between any matching ones.
+ 
+The latter refers to "looping" patterns, *i.e.* patterns that can accept more than one event, e.g. the `b+` in `a b+ c`, 
+which searches for one or more `b`'s.
+
+##### Conditions on Properties
+
+Conditions on the event properties can be specified via the `pattern.where()` or the `pattern.or()` method. These can 
+be either `IterativeCondition`s or `SimpleCondition`s.
+
+**Iterative Conditions:** This is the most general type of conditions. This is how you can specify a condition that 
+accepts subsequent events based on properties of the previously accepted events or some statistic over a subset of them. 
+
+Below is the code for an iterative condition that accepts the next event for a pattern named "middle" if its name starts 
+with "foo", and if the sum of the prices of the previously accepted events for that pattern plus the price of the current 
+event do not exceed the value of 5.0. Iterative conditions can be very powerful, especially in combination with looping 
+patterns, e.g. `oneOrMore()`.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-start.where(new IterativeCondition<SubEvent>() {
+middle.oneOrMore().where(new IterativeCondition<SubEvent>() {
     @Override
     public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
         if (!value.getName().startsWith("foo")) {
@@ -158,7 +247,7 @@ start.where(new IterativeCondition<SubEvent>() {
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-start.where(
+middle.oneOrMore().where(
     (value, ctx) => {
         lazy val sum = ctx.getEventsForPattern("middle").asScala.map(_.getPrice).sum
         value.getName.startsWith("foo") && sum + value.getPrice < 5.0
@@ -168,12 +257,12 @@ start.where(
 </div>
 </div>
 
-<span class="label label-danger">Attention</span> The call to `Context.getEventsForPattern(...)` has to find the 
-elements that belong to the pattern. The cost of this operation can vary, so when implementing your condition, try 
-to minimize the times the method is called.
+<span class="label label-danger">Attention</span> The call to `context.getEventsForPattern(...)` finds all the 
+previously accepted events for a given potential match. The cost of this operation can vary, so when implementing 
+your condition, try to minimize its use.
 
-**Simple Conditions:** This type of conditions extend the aforementioned `IterativeCondition` class. They are simple 
-filtering conditions that decide to accept an element or not, based only on properties of the element itself.
+**Simple Conditions:** This type of condition extends the aforementioned `IterativeCondition` class and decides 
+whether to accept an event or not, based *only* on properties of the event itself.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -181,7 +270,7 @@ filtering conditions that decide to accept an element or not, based only on prop
 start.where(new SimpleCondition<Event>() {
     @Override
     public boolean filter(Event value) {
-        return ... // some condition
+        return value.getName().startsWith("foo");
     }
 });
 {% endhighlight %}
@@ -189,12 +278,13 @@ start.where(new SimpleCondition<Event>() {
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-start.where(event => ... /* some condition */)
+start.where(event => event.getName.startsWith("foo"))
 {% endhighlight %}
 </div>
 </div>
 
-We can also restrict the type of the accepted event to some subtype of the initial event type (here `Event`) via the `subtype` method.
+Finally, we can also restrict the type of the accepted event to some subtype of the initial event type (here `Event`) 
+via the `pattern.subtype(subClass)` method.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -215,12 +305,10 @@ start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)
 </div>
 </div>
 
-As it can be seen here, the subtype condition can also be combined with an additional filter condition on the subtype.
-In fact, you can always provide multiple conditions by calling `where` and `subtype` multiple times.
-These conditions will then be combined using the logical AND operator.
-
-In order to construct or conditions, one has to call the `or` method with a respective filter function.
-Any existing filter function is then ORed with the given one.
+**Combining Conditions:** As shown, the `subtype` condition can be combined with additional conditions. 
+In fact, this holds for every condition. You can arbitrarily combine conditions by sequentially calling 
+`where()`. The final result will be the logical **AND** of the results of the individual conditions. In 
+order to combine conditions using **OR**, you can call the `or()` method, as shown below.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -246,65 +334,412 @@ pattern.where(event => ... /* some condition */).or(event => ... /* or condition
 </div>
 </div>
 
-Next, we can append further states to detect complex patterns.
-We can control the contiguity of two succeeding events to be accepted by the pattern.
+##### Conditions on Contiguity
+
+FlinkCEP supports the following forms of contiguity between events:
+
+ 1. Strict Contiguity: which expects all matching events to appear strictly the one after the other,
+ without any non-matching events in-between.
 
-Strict contiguity means that two matching events have to be directly the one after the other.
-This means that no other events can occur in between. 
-A strict contiguity pattern state can be created via the `next` method.
+ 2. Relaxed Contiguity: which simply ignores non-matching events appearing in-between the matching ones.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity, allowing additional matches 
+ that ignore some matching events.
+
+To illustrate the above with an example, a pattern sequence `"a+ b"` (one or more `"a"`'s followed by a `"b"`) with 
+input `"a1", "c", "a2", "b"` will have the following results:
+
+ 1. Strict Contiguity: `{a2 b}` -- the `"c"` after `"a1"` causes `"a1"` to be discarded.
+
+ 2. Relaxed Contiguity: `{a1 b}` and `{a1 a2 b}` -- `c` is simply ignored.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: `{a1 b}`, `{a2 b}` and `{a1 a2 b}`.
+ 
+For looping patterns (e.g. `oneOrMore()` and `times()`) the default is *relaxed contiguity*. If you want 
+strict contiguity, you have to explicitly specify it by using the `consecutive()` call, and if you want 
+*non-deterministic relaxed contiguity* you can use the `allowCombinations()` call.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 25%">Pattern Operation</th>
+            <th class="text-center">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+       <tr>
+            <td><strong>where(condition)</strong></td>
+            <td>
+                <p>Defines a condition for the current pattern. To match the pattern, an event must satisfy the condition.
+                 Multiple consecutive where() clauses lead to their condtions being ANDed:</p>
+{% highlight java %}
+pattern.where(new IterativeCondition<Event>() {
+    @Override
+    public boolean filter(Event value, Context ctx) throws Exception {
+        return ... // some condition
+    }
+});
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>or(condition)</strong></td>
+            <td>
+                <p>Adds a new condition which is ORed with an existing one. An event can match the pattern only if it 
+                passes at least one of the conditions:</p>
+{% highlight java %}
+pattern.where(new IterativeCondition<Event>() {
+    @Override
+    public boolean filter(Event value, Context ctx) throws Exception {
+        return ... // some condition
+    }
+}).or(new IterativeCondition<Event>() {
+    @Override
+    public boolean filter(Event value, Context ctx) throws Exception {
+        return ... // alternative condition
+    }
+});
+{% endhighlight %}
+                    </td>
+                </tr>
+       <tr>
+           <td><strong>subtype(subClass)</strong></td>
+           <td>
+               <p>Defines a subtype condition for the current pattern. An event can only match the pattern if it is 
+                of this subtype:</p>
+{% highlight java %}
+pattern.subtype(SubEvent.class);
+{% endhighlight %}
+           </td>
+       </tr>
+       <tr>
+          <td><strong>oneOrMore()</strong></td>
+          <td>
+              <p>Specifies that this pattern expects at least one occurrence of a matching event.</p>
+              <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on the 
+              internal contiguity see <a href="#consecutive_java">consecutive</a></p>
+      {% highlight java %}
+      pattern.oneOrMore();
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>times(#ofTimes)</strong></td>
+          <td>
+              <p>Specifies that this pattern expects an exact number of occurrences of a matching event.</p>
+              <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on the 
+              internal contiguity see <a href="#consecutive_java">consecutive</a></p>
 {% highlight java %}
-Pattern<Event, ?> strictNext = start.next("middle");
+pattern.times(2);
 {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>optional()</strong></td>
+          <td>
+              <p>Specifies that this pattern is optional, i.e. it may not occur at all. This is applicable to all 
+              aforementioned quantifiers.</p>
+      {% highlight java %}
+      pattern.oneOrMore().optional();
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>consecutive()</strong><a name="consecutive_java"></a></td>
+          <td>
+              <p>Works in conjunction with oneOrMore() and times() and imposes strict contiguity between the matching 
+              events, i.e. any non-matching element breaks the match (as in next()).</p>
+              <p>If not applied a relaxed contiguity (as in followedBy()) is used.</p>
+            
+              <p>E.g. a pattern like:</p>
+              {% highlight java %}
+              Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                @Override
+                public boolean filter(Event value) throws Exception {
+                  return value.getName().equals("c");
+                }
+              })
+              .followedBy("middle").where(new SimpleCondition<Event>() {
+                @Override
+                public boolean filter(Event value) throws Exception {
+                  return value.getName().equals("a");
+                }
+              }).oneOrMore().consecutive()
+              .followedBy("end1").where(new SimpleCondition<Event>() {
+                @Override
+                public boolean filter(Event value) throws Exception {
+                  return value.getName().equals("b");
+                }
+              });
+              {% endhighlight %}
+              <p>Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B</p>
+            
+              <p>with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}</p>
+              <p>without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}</p>
+          </td>
+       </tr>
+       <tr>
+       <td><strong>allowCombinations()</strong><a name="allow_comb_java"></a></td>
+       <td>
+              <p>Works in conjunction with oneOrMore() and times() and imposes non-deterministic relaxed contiguity 
+              between the matching events (as in followedByAny()).</p>
+              <p>If not applied a relaxed contiguity (as in followedBy) is used.</p>
+                   
+              <p>E.g. a pattern like:</p>
+              {% highlight java %}
+              Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                @Override
+                public boolean filter(Event value) throws Exception {
+                  return value.getName().equals("c");
+                }
+              })
+              .followedBy("middle").where(new SimpleCondition<Event>() {
+                @Override
+                public boolean filter(Event value) throws Exception {
+                  return value.getName().equals("a");
+                }
+              }).oneOrMore().allowCombinations()
+              .followedBy("end1").where(new SimpleCondition<Event>() {
+                @Override
+                public boolean filter(Event value) throws Exception {
+                  return value.getName().equals("b");
+                }
+              });
+              {% endhighlight %}
+               <p>Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B</p>
+               
+               <p>with combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B}</p>
+               <p>without combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}</p>
+       </td>
+       </tr>
+  </tbody>
+</table>
 </div>
 
 <div data-lang="scala" markdown="1">
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 25%">Pattern Operation</th>
+            <th class="text-center">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+      
+        <tr>
+            <td><strong>where(condition)</strong></td>
+            <td>
+              <p>Defines a condition for the current pattern. To match the pattern, an event must satisfy the condition.
+                                  Multiple consecutive where() clauses lead to their condtions being ANDed:</p>
+{% highlight scala %}
+pattern.where(event => ... /* some condition */)
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>or(condition)</strong></td>
+            <td>
+                <p>Adds a new condition which is ORed with an existing one. An event can match the pattern only if it 
+                passes at least one of the conditions:</p>
+{% highlight scala %}
+pattern.where(event => ... /* some condition */)
+    .or(event => ... /* alternative condition */)
+{% endhighlight %}
+                    </td>
+                </tr>
+       <tr>
+           <td><strong>subtype(subClass)</strong></td>
+           <td>
+               <p>Defines a subtype condition for the current pattern. An event can only match the pattern if it is 
+               of this subtype:</p>
 {% highlight scala %}
-val strictNext: Pattern[Event, _] = start.next("middle")
+pattern.subtype(classOf[SubEvent])
 {% endhighlight %}
+           </td>
+       </tr>
+       <tr>
+          <td><strong>oneOrMore()</strong></td>
+          <td>
+               <p>Specifies that this pattern expects at least one occurrence of a matching event.</p>
+                            <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on the 
+                            internal contiguity see <a href="#consecutive_scala">consecutive</a></p>
+      {% highlight scala %}
+      pattern.oneOrMore()
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+                 <td><strong>times(#ofTimes)</strong></td>
+                 <td>
+                     <p>Specifies that this pattern expects an exact number of occurrences of a matching event.</p>
+                                   <p>By default a relaxed internal contiguity (between subsequent events) is used. 
+                                   For more info on the internal contiguity see <a href="#consecutive_scala">consecutive</a></p>
+             {% highlight scala %}
+             pattern.times(2)
+             {% endhighlight %}
+                 </td>
+       </tr>
+       <tr>
+          <td><strong>optional()</strong></td>
+          <td>
+             <p>Specifies that this pattern is optional, i.e. it may not occur at all. This is applicable to all 
+                           aforementioned quantifiers.</p>
+      {% highlight scala %}
+      pattern.oneOrMore().optional()
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>consecutive()</strong><a name="consecutive_scala"></a></td>
+          <td>
+            <p>Works in conjunction with oneOrMore() and times() and imposes strict contiguity between the matching 
+                          events, i.e. any non-matching element breaks the match (as in next()).</p>
+                          <p>If not applied a relaxed contiguity (as in followedBy()) is used.</p>
+            
+      <p>E.g. a pattern like:</p> 
+      {% highlight scala %}
+      Pattern.begin("start").where(_.getName().equals("c"))
+       .followedBy("middle").where(_.getName().equals("a"))
+                            .oneOrMore().consecutive()
+       .followedBy("end1").where(_.getName().equals("b"));
+      {% endhighlight %}
+
+            <p>Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B</p>
+                        
+                          <p>with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}</p>
+                          <p>without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}</p>
+          </td>
+       </tr>
+       <tr>
+              <td><strong>allowCombinations()</strong><a name="allow_comb_java"></a></td>
+              <td>
+                <p>Works in conjunction with oneOrMore() and times() and imposes non-deterministic relaxed contiguity 
+                     between the matching events (as in followedByAny()).</p>
+                     <p>If not applied a relaxed contiguity (as in followedBy) is used.</p>
+                          
+      <p>E.g. a pattern like:</p>
+      {% highlight scala %}
+      Pattern.begin("start").where(_.getName().equals("c"))
+       .followedBy("middle").where(_.getName().equals("a"))
+                            .oneOrMore().allowCombinations()
+       .followedBy("end1").where(_.getName().equals("b"));
+      {% endhighlight %}
+                     
+                      <p>Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B</p>
+                          
+                      <p>with combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B}</p>
+                      <p>without combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}</p>
+              </td>
+              </tr>
+  </tbody>
+</table>
 </div>
+
 </div>
 
-Non-strict contiguity means that other events are allowed to occur in-between two matching events.
-A non-strict contiguity pattern state can be created via the `followedBy` or `followedByAny` method.
+### Combining Patterns
+
+Now that we have seen what an individual pattern can look like, it is time to see how to combine them 
+into a full pattern sequence.
+
+A pattern sequence has to start with an initial pattern, as shown below:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-Pattern<Event, ?> nonStrictNext = start.followedBy("middle");
+Pattern<Event, ?> start = Pattern.<Event>begin("start");
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")
+val start : Pattern[Event, _] = Pattern.begin("start")
 {% endhighlight %}
 </div>
 </div>
 
-For non-strict contiguity one can specify if only the first succeeding matching event will be matched, or
-all. In the latter case multiple matches will be emitted for the same beginning.
+Next, you can append more patterns to your pattern sequence by specifying the desired *contiguity conditions* between 
+them. In the [previous paragraph](#conditions-on-contiguity), we described the different contiguity modes supported by
+Flink, namely *strict*, *relaxed*, and *non-deterministic relaxed*, and how to apply them in looping patterns. To apply 
+them between consecutive patterns, you can use: 
+
+1. `next()`, for *strict*, 
+2. `followedBy()`, for *relaxed*, and 
+3. `followedByAny()`, for *non-deterministic relaxed* contiguity.
+
+or 
+
+1. `notNext()`, if you do not want an event type to directly follow another
+2. `notFollowedBy()`, if you do not want an event type to be anywhere between two other event types
+
+<span class="label label-danger">Attention</span> A pattern sequence cannot end in `notFollowedBy()`.
+
+<span class="label label-danger">Attention</span> A `NOT` pattern cannot be preceded by an optional one.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-Pattern<Event, ?> nonStrictNext = start.followedByAny("middle");
+
+// strict contiguity
+Pattern<Event, ?> strict = start.next("middle").where(...);
+
+// relaxed contiguity
+Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
+
+// non-deterministic relaxed contiguity
+Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
+
+// NOT pattern with strict contiguity
+Pattern<Event, ?> strictNot = start.notNext("not").where(...);
+
+// NOT pattern with relaxed contiguity
+Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
+
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val nonStrictNext : Pattern[Event, _] = start.followedByAny("middle")
+
+// strict contiguity
+val strict: Pattern[Event, _] = start.next("middle").where(...)
+
+// relaxed contiguity
+val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)
+
+// non-deterministic relaxed contiguity
+val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)
+
+// NOT pattern with strict contiguity
+val strictNot: Pattern[Event, _] = start.notNext("not").where(...)
+
+// NOT pattern with relaxed contiguity
+val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(...)
+
 {% endhighlight %}
 </div>
-
 </div>
-It is also possible to define a temporal constraint for the pattern to be valid.
-For example, one can define that a pattern should occur within 10 seconds via the `within` method. 
+
+Bear in mind that relaxed contiguity means that only the first succeeding matching event will be matched, while
+with non-deterministic relaxed contiguity, multiple matches will be emitted for the same beginning. As an example,
+a pattern `a b`, given the event sequence `"a", "c", "b1", "b2"` will give the following results:
+
+1. Strict Contiguity between `a` and `b`: `{}` (no match) -- the `"c"` after `"a"` causes `"a"` to be discarded.
+
+2. Relaxed Contiguity between `a` and `b`: `{a b1}` -- as relaxed continuity is viewed as "skip non-matching events 
+till the next matching one".
+ 
+3. Non-Deterministic Relaxed Contiguity between `a` and `b`: `{a b1}`, `{a b2}` -- as this is the most general form.
+
+Finally, it is also possible to define a temporal constraint for the pattern to be valid.
+For example, you can define that a pattern should occur within 10 seconds via the `pattern.within()` method. 
 Temporal patterns are supported for both [processing and event time]({{site.baseurl}}/dev/event_time.html).
 
+<span class="label label-danger">Attention</span> A pattern sequence can only have one temporal constraint. If 
+multiple such constraints are defined on different individual patterns, then the smallest one is applied.
+
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -332,162 +767,76 @@ next.within(Time.seconds(10))
     </thead>
     <tbody>
         <tr>
-            <td><strong>Begin</strong></td>
+            <td><strong>begin()</strong></td>
             <td>
-            <p>Defines a starting pattern state:</p>
+            <p>Defines a starting pattern:</p>
 {% highlight java %}
 Pattern<Event, ?> start = Pattern.<Event>begin("start");
 {% endhighlight %}
             </td>
         </tr>
         <tr>
-            <td><strong>Next</strong></td>
+            <td><strong>next()</strong></td>
             <td>
-                <p>Appends a new pattern state. A matching event has to directly succeed the previous matching event:</p>
+                <p>Appends a new pattern. A matching event has to directly succeed the previous matching event 
+                (strict contiguity):</p>
 {% highlight java %}
-Pattern<Event, ?> next = start.next("next");
+Pattern<Event, ?> next = start.next("middle");
 {% endhighlight %}
             </td>
         </tr>
         <tr>
-            <td><strong>FollowedBy</strong></td>
+            <td><strong>followedBy()</strong></td>
             <td>
-                <p>Appends a new pattern state. Other events can occur between a matching event and the previous matching event:</p>
+                <p>Appends a new pattern. Other events can occur between a matching event and the previous 
+                matching event (relaxed contiguity):</p>
 {% highlight java %}
-Pattern<Event, ?> followedBy = start.followedBy("next");
+Pattern<Event, ?> followedBy = start.followedBy("middle");
 {% endhighlight %}
             </td>
         </tr>
         <tr>
-            <td><strong>Where</strong></td>
+            <td><strong>followedByAny()</strong></td>
             <td>
-                <p>Defines a condition for the current pattern state. Only if an event satisifes the condition, it can match the state:</p>
+                <p>Appends a new pattern. Other events can occur between a matching event and the previous 
+                matching event, and alternative matches will be presented for every alternative matching event 
+                (non-deterministic relaxed contiguity):</p>
 {% highlight java %}
-patternState.where(new IterativeCondition<Event>() {
-    @Override
-    public boolean filter(Event value, Context ctx) throws Exception {
-        return ... // some condition
-    }
-});
+Pattern<Event, ?> followedByAny = start.followedByAny("middle");
 {% endhighlight %}
-            </td>
+                    </td>
         </tr>
         <tr>
-            <td><strong>Or</strong></td>
-            <td>
-                <p>Adds a new filter condition which is ORed with an existing filter condition. Only if an event passes the filter condition, it can match the state:</p>
-{% highlight java %}
-patternState.where(new IterativeCondition<Event>() {
-    @Override
-    public boolean filter(Event value, Context ctx) throws Exception {
-        return ... // some condition
-    }
-}).or(new IterativeCondition<Event>() {
-    @Override
-    public boolean filter(Event value, Context ctx) throws Exception {
-        return ... // alternative condition
-    }
-});
-{% endhighlight %}
+                    <td><strong>notNext()</strong></td>
+                    <td>
+                        <p>Appends a new negative pattern. A matching (negative) event has to directly succeed the 
+                        previous matching event (strict contiguity) for the partial match to be discarded:</p>
+        {% highlight java %}
+        Pattern<Event, ?> notNext = start.notNext("not");
+        {% endhighlight %}
+                    </td>
+                </tr>
+                <tr>
+                    <td><strong>notFollowedBy()</strong></td>
+                    <td>
+                        <p>Appends a new negative pattern. A partial matching event sequence will be discarded even
+                        if other events occur between the matching (negative) event and the previous matching event 
+                        (relaxed contiguity):</p>
+        {% highlight java %}
+        Pattern<Event, ?> notFollowedBy = start.notFllowedBy("not");
+        {% endhighlight %}
                     </td>
                 </tr>
        <tr>
-           <td><strong>Subtype</strong></td>
-           <td>
-               <p>Defines a subtype condition for the current pattern state. Only if an event is of this subtype, it can match the state:</p>
-{% highlight java %}
-patternState.subtype(SubEvent.class);
-{% endhighlight %}
-           </td>
-       </tr>
-       <tr>
-          <td><strong>Within</strong></td>
+          <td><strong>within(time)</strong></td>
           <td>
-              <p>Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event sequence exceeds this time, it is discarded:</p>
+              <p>Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event 
+              sequence exceeds this time, it is discarded:</p>
 {% highlight java %}
-patternState.within(Time.seconds(10));
+pattern.within(Time.seconds(10));
 {% endhighlight %}
           </td>
        </tr>
-       <tr>
-          <td><strong>ZeroOrMore</strong></td>
-          <td>
-              <p>Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.</p>
-              <p>If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.</p>
-              <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_java">consecutive</a></p>
-      {% highlight java %}
-      patternState.zeroOrMore();
-      {% endhighlight %}
-          </td>
-       </tr>
-       <tr>
-          <td><strong>OneOrMore</strong></td>
-          <td>
-              <p>Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.</p>
-              <p>If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.</p>
-              <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_java">consecutive</a></p>
-      {% highlight java %}
-      patternState.oneOrMore();
-      {% endhighlight %}
-          </td>
-       </tr>
-       <tr>
-          <td><strong>Optional</strong></td>
-          <td>
-              <p>Specifies that this pattern can occur zero or once.</p>
-      {% highlight java %}
-      patternState.optional();
-      {% endhighlight %}
-          </td>
-       </tr>
-       <tr>
-          <td><strong>Times</strong></td>
-          <td>
-              <p>Specifies exact number of times that this pattern should be matched.</p>
-              <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_java">consecutive</a></p>
-      {% highlight java %}
-      patternState.times(2);
-      {% endhighlight %}
-          </td>
-       </tr>
-       <tr>
-          <td><strong>Consecutive</strong><a name="consecutive_java"></a></td>
-          <td>
-              <p>Works in conjunction with zeroOrMore, oneOrMore or times. Specifies that any not matching element breaks the loop.</p>
-              
-              <p>If not applied a relaxed continuity (as in followedBy) is used.</p>
-
-          <p>E.g. a pattern like:</p>
-      {% highlight java %}
-      Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-           @Override
-           public boolean filter(Event value) throws Exception {
-               return value.getName().equals("c");
-           }
-      })
-      .followedBy("middle").where(new SimpleCondition<Event>() {
-           @Override
-           public boolean filter(Event value) throws Exception {
-               return value.getName().equals("a");
-           }
-      })
-      .oneOrMore(true).consecutive()
-      .followedBy("end1").where(new SimpleCondition<Event>() {
-           @Override
-           public boolean filter(Event value) throws Exception {
-               return value.getName().equals("b");
-           }
-      });
-      {% endhighlight %}
-
-             <p>Will generate the following matches for a sequence: C D A1 A2 A3 D A4 B</p>
-
-             <p>with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}</p>
-             <p>without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}</p>
-
-             <p><b>NOTICE:</b> This option can be applied only to zeroOrMore(), oneOrMore() and times()!</p>
-          </td>
-       </tr>
   </tbody>
 </table>
 </div>
@@ -502,142 +851,90 @@ patternState.within(Time.seconds(10));
     </thead>
     <tbody>
         <tr>
-            <td><strong>Begin</strong></td>
+            <td><strong>begin()</strong></td>
             <td>
-            <p>Defines a starting pattern state:</p>
+            <p>Defines a starting pattern:</p>
 {% highlight scala %}
 val start = Pattern.begin[Event]("start")
 {% endhighlight %}
             </td>
         </tr>
         <tr>
-            <td><strong>Next</strong></td>
+            <td><strong>next()</strong></td>
             <td>
-                <p>Appends a new pattern state. A matching event has to directly succeed the previous matching event:</p>
+                <p>Appends a new pattern. A matching event has to directly succeed the previous matching event 
+                (strict contiguity):</p>
 {% highlight scala %}
 val next = start.next("middle")
 {% endhighlight %}
             </td>
         </tr>
         <tr>
-            <td><strong>FollowedBy</strong></td>
+            <td><strong>followedBy()</strong></td>
             <td>
-                <p>Appends a new pattern state. Other events can occur between a matching event and the previous matching event:</p>
+                <p>Appends a new pattern. Other events can occur between a matching event and the previous 
+                matching event (relaxed contiguity) :</p>
 {% highlight scala %}
 val followedBy = start.followedBy("middle")
 {% endhighlight %}
             </td>
         </tr>
         <tr>
-            <td><strong>Where</strong></td>
-            <td>
-                <p>Defines a filter condition for the current pattern state. Only if an event passes the filter, it can match the state:</p>
-{% highlight scala %}
-patternState.where(event => ... /* some condition */)
-{% endhighlight %}
-            </td>
-        </tr>
-        <tr>
-            <td><strong>Or</strong></td>
-            <td>
-                <p>Adds a new filter condition which is ORed with an existing filter condition. Only if an event passes the filter condition, it can match the state:</p>
-{% highlight scala %}
-patternState.where(event => ... /* some condition */)
-    .or(event => ... /* alternative condition */)
-{% endhighlight %}
-                    </td>
+                    <td><strong>followedByAny()</strong></td>
+                    <td>
+                        <p>Appends a new pattern. Other events can occur between a matching event and the previous 
+                        matching event, and alternative matches will be presented for every alternative matching event 
+                        (non-deterministic relaxed contiguity):</p>
+        {% highlight scala %}
+       val followedByAny = start.followedByAny("middle");
+        {% endhighlight %}
+                            </td>
                 </tr>
+                
+                <tr>
+                                    <td><strong>notNext()</strong></td>
+                                    <td>
+                                        <p>Appends a new negative pattern. A matching (negative) event has to directly succeed the 
+                                        previous matching event (strict contiguity) for the partial match to be discarded:</p>
+                        {% highlight scala %}
+                        val notNext = start.notNext("not")
+                        {% endhighlight %}
+                                    </td>
+                                </tr>
+                                <tr>
+                                    <td><strong>notFollowedBy()</strong></td>
+                                    <td>
+                                        <p>Appends a new negative pattern. A partial matching event sequence will be discarded even
+                                        if other events occur between the matching (negative) event and the previous matching event 
+                                        (relaxed contiguity):</p>
+                        {% highlight scala %}
+                        val notFollowedBy = start.notFllowedBy("not")
+                        {% endhighlight %}
+                                    </td>
+                                </tr>
+        
        <tr>
-           <td><strong>Subtype</strong></td>
-           <td>
-               <p>Defines a subtype condition for the current pattern state. Only if an event is of this subtype, it can match the state:</p>
-{% highlight scala %}
-patternState.subtype(classOf[SubEvent])
-{% endhighlight %}
-           </td>
-       </tr>
-       <tr>
-          <td><strong>Within</strong></td>
+          <td><strong>within(time)</strong></td>
           <td>
-              <p>Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event sequence exceeds this time, it is discarded:</p>
+              <p>Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event 
+              sequence exceeds this time, it is discarded:</p>
 {% highlight scala %}
-patternState.within(Time.seconds(10))
+pattern.within(Time.seconds(10))
 {% endhighlight %}
           </td>
       </tr>
        <tr>
-          <td><strong>ZeroOrMore</strong></td>
-          <td>
-              <p>Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.</p>
-              <p>If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.</p>
-              <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_scala">consecutive</a></p>
-      {% highlight scala %}
-      patternState.zeroOrMore()
-      {% endhighlight %}
-          </td>
-       </tr>
-       <tr>
-          <td><strong>OneOrMore</strong></td>
-          <td>
-              <p>Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.</p>
-              <p>If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.</p>
-              <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_scala">consecutive</a></p>
-      {% highlight scala %}
-      patternState.oneOrMore()
-      {% endhighlight %}
-          </td>
-       </tr>
-       <tr>
-          <td><strong>Optional</strong></td>
-          <td>
-              <p>Specifies that this pattern can occur zero or once.</p>
-      {% highlight scala %}
-      patternState.optional()
-      {% endhighlight %}
-          </td>
-       </tr>
-       <tr>
-          <td><strong>Times</strong></td>
-          <td>
-              <p>Specifies exact number of times that this pattern should be matched.</p>
-              <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_scala">consecutive</a></p>
-      {% highlight scala %}
-      patternState.times(2)
-      {% endhighlight %}
-          </td>
-       </tr>
-       <tr>
-          <td><strong>Consecutive</strong><a name="consecutive_scala"></a></td>
-          <td>
-            <p>Works in conjunction with zeroOrMore, oneOrMore or times. Specifies that any not matching element breaks the loop.</p>
-            
-            <p>If not applied a relaxed continuity (as in followedBy) is used.</p>
-            
-      {% highlight scala %}
-      Pattern.begin("start").where(_.getName().equals("c"))
-       .followedBy("middle").where(_.getName().equals("a"))
-                            .oneOrMore(true).consecutive()
-       .followedBy("end1").where(_.getName().equals("b"));
-      {% endhighlight %}
-
-            <p>Will generate the following matches for a sequence: C D A1 A2 A3 D A4 B</p>
-
-            <p>with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}</p>
-            <p>without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}</p>
-
-            <p><b>NOTICE:</b> This option can be applied only to zeroOrMore(), oneOrMore() and times()!</p>
-          </td>
-       </tr>
   </tbody>
 </table>
 </div>
 
 </div>
 
-### Detecting Patterns
+## Detecting Patterns
 
-In order to run a stream of events against your pattern, you have to create a `PatternStream`.
-Given an input stream `input` and a pattern `pattern`, you create the `PatternStream` by calling
+After specifying the pattern sequence you are looking for, it is time to apply it to your input stream to detect 
+potential matches. In order to run a stream of events against your pattern sequence, you have to create a `PatternStream`.
+Given an input stream `input` and a pattern `pattern`, you create the `PatternStream` by calling:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -659,37 +956,46 @@ val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
 </div>
 </div>
 
+The input stream can be *keyed* or *non-keyed* depending on your use-case.
+
+<span class="label label-danger">Attention</span> Applying your pattern on a non-keyed stream will result is a job with 
+parallelism equal to 1.
+
 ### Selecting from Patterns
+
 Once you have obtained a `PatternStream` you can select from detected event sequences via the `select` or `flatSelect` methods.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-The `select` method requires a `PatternSelectFunction` implementation.
+The `select()` method requires a `PatternSelectFunction` implementation.
 A `PatternSelectFunction` has a `select` method which is called for each matching event sequence.
-It receives a map of string/event pairs of the matched events.
-The string is defined by the name of the state to which the event has been matched.
-The `select` method can return exactly one result.
+It receives a match in the form of `Map<String, List<IN>>` where the key is the name of each pattern in your pattern 
+sequence and the value is a list of all accepted events for that pattern (`IN` is the type of your input elements). 
+The events for a given pattern are ordered by timestamp. The reason for returning a list of accepted events for each 
+pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a 
+given pattern. The selection function returns exactly one result.
 
 {% highlight java %}
 class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
     @Override
-    public OUT select(Map<String, IN> pattern) {
-        IN startEvent = pattern.get("start");
-        IN endEvent = pattern.get("end");
+    public OUT select(Map<String, List<IN>> pattern) {
+        IN startEvent = pattern.get("start").get(0);
+        IN endEvent = pattern.get("end").get(0);
         return new OUT(startEvent, endEvent);
     }
 }
 {% endhighlight %}
 
-A `PatternFlatSelectFunction` is similar to the `PatternSelectFunction`, with the only distinction that it can return an arbitrary number of results.
-In order to do this, the `select` method has an additional `Collector` parameter which is used for the element output.
+A `PatternFlatSelectFunction` is similar to the `PatternSelectFunction`, with the only distinction that it can return an 
+arbitrary number of results. In order to do this, the `select` method has an additional `Collector` parameter which is 
+used to forward your output elements downstream.
 
 {% highlight java %}
 class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
     @Override
-    public void select(Map<String, IN> pattern, Collector<OUT> collector) {
-        IN startEvent = pattern.get("start");
-        IN endEvent = pattern.get("end");
+    public void select(Map<String, List<IN>> pattern, Collector<OUT> collector) {
+        IN startEvent = pattern.get("start").get(0);
+        IN endEvent = pattern.get("end").get(0);
 
         for (int i = 0; i < startEvent.getValue(); i++ ) {
             collector.collect(new OUT(startEvent, endEvent));
@@ -700,26 +1006,29 @@ class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<
 </div>
 
 <div data-lang="scala" markdown="1">
-The `select` method takes a selection function as argument, which is called for each matching event sequence.
-It receives a map of string/event pairs of the matched events.
-The string is defined by the name of the state to which the event has been matched.
-The selection function returns exactly one result per call.
+The `select()` method takes a selection function as argument, which is called for each matching event sequence.
+It receives a match in the form of `Map[String, Iterable[IN]]` where the key is the name of each pattern in your pattern 
+sequence and the value is an Iterable over all accepted events for that pattern (`IN` is the type of your input elements). 
+The events for a given pattern are ordered by timestamp. The reason for returning an iterable of accepted events for each 
+pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a 
+given pattern. The selection function returns exactly one result per call.
 
 {% highlight scala %}
-def selectFn(pattern : mutable.Map[String, IN]): OUT = {
-    val startEvent = pattern.get("start").get
-    val endEvent = pattern.get("end").get
+def selectFn(pattern : Map[String, Iterable[IN]]): OUT = {
+    val startEvent = pattern.get("start").get.next
+    val endEvent = pattern.get("end").get.next
     OUT(startEvent, endEvent)
 }
 {% endhighlight %}
 
-The `flatSelect` method is similar to the `select` method. Their only difference is that the function passed to the `flatSelect` method can return an arbitrary number of results per call.
-In order to do this, the function for `flatSelect` has an additional `Collector` parameter which is used for the element output.
+The `flatSelect` method is similar to the `select` method. Their only difference is that the function passed to the 
+`flatSelect` method can return an arbitrary number of results per call. In order to do this, the function for 
+`flatSelect` has an additional `Collector` parameter which is used to forward your output elements downstream.
 
 {% highlight scala %}
-def flatSelectFn(pattern : mutable.Map[String, IN], collector : Collector[OUT]) = {
-    val startEvent = pattern.get("start").get
-    val endEvent = pattern.get("end").get
+def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = {
+    val startEvent = pattern.get("start").get.next
+    val endEvent = pattern.get("end").get.next
     for (i <- 0 to startEvent.getValue) {
         collector.collect(OUT(startEvent, endEvent))
     }
@@ -730,17 +1039,19 @@ def flatSelectFn(pattern : mutable.Map[String, IN], collector : Collector[OUT])
 
 ### Handling Timed Out Partial Patterns
 
-Whenever a pattern has a window length associated via the `within` keyword, it is possible that partial event patterns will be discarded because they exceed the window length.
-In order to react to these timeout events the `select` and `flatSelect` API calls allow a timeout handler to be specified.
-This timeout handler is called for each partial event pattern which has timed out.
-The timeout handler receives all the events that have been matched so far by the pattern, and the timestamp when the timeout was detected.
-
+Whenever a pattern has a window length attached via the `within` keyword, it is possible that partial event sequences 
+are discarded because they exceed the window length. In order to react to these timed out partial matches the `select` 
+and `flatSelect` API calls allow a timeout handler to be specified. This timeout handler is called for each timed out 
+partial event sequence. The timeout handler receives all the events that have been matched so far by the pattern, and 
+the timestamp when the timeout was detected.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-In order to treat partial patterns, the `select` and `flatSelect` API calls offer an overloaded version which takes as the first parameter a `PatternTimeoutFunction`/`PatternFlatTimeoutFunction` and as second parameter the known `PatternSelectFunction`/`PatternFlatSelectFunction`.
-The return type of the timeout function can be different from the select function.
-The timeout event and the select event are wrapped in `Either.Left` and `Either.Right` respectively so that the resulting data stream is of type `org.apache.flink.types.Either`.
+In order to treat partial patterns, the `select` and `flatSelect` API calls offer an overloaded version which takes as 
+the first parameter a `PatternTimeoutFunction`/`PatternFlatTimeoutFunction` and as second parameter the known 
+`PatternSelectFunction`/`PatternFlatSelectFunction`. The return type of the timeout function can be different from the 
+select function. The timeout event and the select event are wrapped in `Either.Left` and `Either.Right` respectively 
+so that the resulting data stream is of type `org.apache.flink.types.Either`.
 
 {% highlight java %}
 PatternStream<Event> patternStream = CEP.pattern(input, pattern);
@@ -761,7 +1072,7 @@ DataStream<Either<TimeoutEvent, ComplexEvent>> flatResult = patternStream.flatSe
 <div data-lang="scala" markdown="1">
 In order to treat partial patterns, the `select` API call offers an overloaded version which takes as the first parameter a timeout function and as second parameter a selection function.
 The timeout function is called with a map of string-event pairs of the partial match which has timed out and a long indicating when the timeout occurred.
-The string is defined by the name of the state to which the event has been matched.
+The string is defined by the name of the pattern to which the event has been matched.
 The timeout function returns exactly one result per call.
 The return type of the timeout function can be different from the select function.
 The timeout event and the select event are wrapped in `Left` and `Right` respectively so that the resulting data stream is of type `Either`.
@@ -770,9 +1081,9 @@ The timeout event and the select event are wrapped in `Left` and `Right` respect
 val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
 
 DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.select{
-    (pattern: mutable.Map[String, Event], timestamp: Long) => TimeoutEvent()
+    (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
 } {
-    pattern: mutable.Map[String, Event] => ComplexEvent()
+    pattern: Map[String, Iterable[Event]] => ComplexEvent()
 }
 {% endhighlight %}
 
@@ -784,10 +1095,10 @@ The collector can be used to emit an arbitrary number of events.
 val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
 
 DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect{
-    (pattern: mutable.Map[String, Event], timestamp: Long, out: Collector[TimeoutEvent]) =>
+    (pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) =>
         out.collect(TimeoutEvent())
 } {
-    (pattern: mutable.Map[String, Event], out: Collector[ComplexEvent]) =>
+    (pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) =>
         out.collect(ComplexEvent())
 }
 {% endhighlight %}
@@ -795,7 +1106,7 @@ DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect
 </div>
 </div>
 
-### Handling Lateness in Event Time
+## Handling Lateness in Event Time
 
 In `CEP` the order in which elements are processed matters. To guarantee that elements are processed in the correct order
 when working in event time, an incoming element is initially put in a buffer where elements are *sorted in ascending 
@@ -811,8 +1122,8 @@ seen watermark. Late elements are not further processed.
 
 ## Examples
 
-The following example detects the pattern `start, middle(name = "error") -> end(name = "critical")` on a keyed data stream of `Events`.
-The events are keyed by their ids and a valid pattern has to occur within 10 seconds.
+The following example detects the pattern `start, middle(name = "error") -> end(name = "critical")` on a keyed data 
+stream of `Events`. The events are keyed by their ids and a valid pattern has to occur within 10 seconds.
 The whole processing is done with event time.
 
 <div class="codetabs" markdown="1">
@@ -847,7 +1158,7 @@ PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);
 
 DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
 	@Override
-	public Alert select(Map<String, Event> pattern) throws Exception {
+	public Alert select(Map<String, List<Event>> pattern) throws Exception {
 		return createAlert(pattern);
 	}
 });
@@ -874,3 +1185,23 @@ val alerts = patternStream.select(createAlert(_)))
 {% endhighlight %}
 </div>
 </div>
+
+## Migrating from an older Flink version
+
+The CEP library in Flink-1.3 ships with a number of new features which have led to some changes in the API. Here we 
+describe the changes that you need to make to your old CEP jobs, in order to be able to run them with Flink-1.3. After 
+making these changes and recompiling your job, you will be able to resume its execution from a savepoint taken with the 
+old version of your job, *i.e.* without having to re-process your past data.
+
+The changes required are:
+
+1. Change your conditions (the ones in the `where(...)` clause) to extend the `SimpleCondition` class instead of 
+implementing the `FilterFunction` interface.
+
+2. Change your functions provided as arguments to the `select(...)` and `flatSelect(...)` methods to expect a list of
+events associated with each pattern (`List` in `Java`, `Iterable` in `Scala`). This is because with the addition of
+the looping patterns, multiple input events can match a single (looping) pattern.
+
+3. The `followedBy()` in Flink 1.1 and 1.2 implied `non-deterministic relaxed contiguity` (see 
+[here](#conditions-on-contiguity)). In Flink 1.3 this has changed and `followedBy()` implies `relaxed contiguity`, 
+while `followedByAny()` should be used if `non-deterministic relaxed contiguity` is required.
\ No newline at end of file


Mime
View raw message