flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dwysakow...@apache.org
Subject [2/3] flink git commit: [FLINK-6927] [cep] Support pattern group in CEP
Date Wed, 05 Jul 2017 09:56:29 GMT
[FLINK-6927] [cep] Support pattern group in CEP

This closes #4153


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3096bd03
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3096bd03
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3096bd03

Branch: refs/heads/master
Commit: 3096bd03324dfe1ebc55b400b68a4b5a6c6ef8b0
Parents: 4d18afe
Author: Dian Fu <fudian.fd@alibaba-inc.com>
Authored: Wed Jun 21 16:41:21 2017 +0800
Committer: Dawid Wysakowicz <dwysakowicz@apache.org>
Committed: Wed Jul 5 10:46:00 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            |  214 +++-
 .../flink/cep/scala/pattern/GroupPattern.scala  |   56 +
 .../flink/cep/scala/pattern/Pattern.scala       |   43 +
 .../flink/cep/nfa/compiler/NFACompiler.java     |  318 ++++-
 .../cep/nfa/compiler/NFAStateNameHandler.java   |    8 +
 .../apache/flink/cep/pattern/GroupPattern.java  |   65 ++
 .../org/apache/flink/cep/pattern/Pattern.java   |   48 +
 .../apache/flink/cep/pattern/Quantifier.java    |    8 +-
 .../org/apache/flink/cep/nfa/GroupITCase.java   | 1087 ++++++++++++++++++
 9 files changed, 1775 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3096bd03/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 63f61f7..1ff057a 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -161,8 +161,10 @@ it to a looping one by using [Quantifiers](#quantifiers). In addition, each patt
 
 In FlinkCEP, looping patterns can be specified using these methods: `pattern.oneOrMore()`, for patterns that expect one or
 more occurrences of a given event (e.g. the `b+` mentioned previously); and `pattern.times(#ofTimes)`, for patterns that
-expect a specific number of occurrences of a given type of event, e.g. 4 `a`'s. All patterns, looping or not, can be made
-optional using the `pattern.optional()` method. For a pattern named `start`, the following are valid quantifiers:
+expect a specific number of occurrences of a given type of event, e.g. 4 `a`'s; and `pattern.times(#fromTimes, #toTimes)`,
+for patterns that expect a specific minimum number of occurrences and maximum number of occurrences of a given type of event,
+e.g. 2-4 `a`s. All patterns, looping or not, can be made optional using the `pattern.optional()` method. For a pattern
+named `start`, the following are valid quantifiers:
 
  <div class="codetabs" markdown="1">
  <div data-lang="java" markdown="1">
@@ -173,6 +175,12 @@ optional using the `pattern.optional()` method. For a pattern named `start`, the
  // expecting 0 or 4 occurrences
  start.times(4).optional();
 
+ // expecting 2, 3 or 4 occurrences
+ start.times(2, 4);
+
+ // expecting 0, 2, 3 or 4 occurrences
+ start.times(2, 4).optional();
+
  // expecting 1 or more occurrences
  start.oneOrMore();
 
@@ -189,6 +197,12 @@ optional using the `pattern.optional()` method. For a pattern named `start`, the
  // expecting 0 or 4 occurrences
  start.times(4).optional()
 
+ // expecting 2, 3 or 4 occurrences
+ start.times(2, 4);
+
+ // expecting 0, 2, 3 or 4 occurrences
+ start.times(2, 4).optional();
+
  // expecting 1 or more occurrences
  start.oneOrMore()
 
@@ -475,6 +489,18 @@ pattern.times(2);
           </td>
        </tr>
        <tr>
+          <td><strong>times(#fromTimes, #toTimes)</strong></td>
+          <td>
+              <p>Specifies that this pattern expects occurrences between <strong>#fromTimes</strong>
+              and <strong>#toTimes</strong> of a matching event.</p>
+              <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on
+              internal contiguity see <a href="#consecutive_java">consecutive</a>.</p>
+{% highlight java %}
+pattern.times(2, 4);
+{% endhighlight %}
+          </td>
+       </tr>
+       <tr>
           <td><strong>optional()</strong></td>
           <td>
               <p>Specifies that this pattern is optional, i.e. it may not occur at all. This is applicable to all
@@ -633,6 +659,18 @@ pattern.times(2)
                  </td>
        </tr>
        <tr>
+         <td><strong>times(#fromTimes, #toTimes)</strong></td>
+         <td>
+             <p>Specifies that this pattern expects occurrences between <strong>#fromTimes</strong>
+             and <strong>#toTimes</strong> of a matching event.</p>
+             <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on
+             internal contiguity see <a href="#consecutive_java">consecutive</a>.</p>
+{% highlight scala %}
+pattern.times(2, 4);
+{% endhighlight %}
+         </td>
+       </tr>
+       <tr>
           <td><strong>optional()</strong></td>
           <td>
              <p>Specifies that this pattern is optional, i.e. it may not occur at all. This is applicable to all
@@ -805,6 +843,63 @@ next.within(Time.seconds(10))
 </div>
 </div>
 
+It is also possible to define a pattern sequence as the condition for `begin`, `followedBy`, `followedByAny` and
+`next`. The pattern sequence will be considered as the matching condition logically and a `GroupPattern` will be
+returned and it is possible to apply `oneOrMore()`, `times(#ofTimes)`, `times(#fromTimes, #toTimes)`, `optional()`,
+`consecutive()`, `allowCombinations()` to the `GroupPattern`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+Pattern<Event, ?> start = Pattern.begin(
+    Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
+);
+
+// strict contiguity
+Pattern<Event, ?> strict = start.next(
+    Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
+).times(3);
+
+// relaxed contiguity
+Pattern<Event, ?> relaxed = start.followedBy(
+    Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
+).oneOrMore();
+
+// non-deterministic relaxed contiguity
+Pattern<Event, ?> nonDetermin = start.followedByAny(
+    Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
+).optional();
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val start: Pattern[Event, _] = Pattern.begin(
+    Pattern.begin[Event, _]("start").where(...).followedBy("start_middle").where(...)
+)
+
+// strict contiguity
+val strict: Pattern[Event, _] = start.next(
+    Pattern.begin[Event, _]("next_start").where(...).followedBy("next_middle").where(...)
+).times(3)
+
+// relaxed contiguity
+val relaxed: Pattern[Event, _] = start.followedBy(
+    Pattern.begin[Event, _]("followedby_start").where(...).followedBy("followedby_middle").where(...)
+).oneOrMore()
+
+// non-deterministic relaxed contiguity
+val nonDetermin: Pattern[Event, _] = start.followedByAny(
+    Pattern.begin[Event, _]("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
+).optional()
+
+{% endhighlight %}
+</div>
+</div>
+
 <br />
 
 <div class="codetabs" markdown="1">
@@ -818,7 +913,7 @@ next.within(Time.seconds(10))
     </thead>
     <tbody>
         <tr>
-            <td><strong>begin()</strong></td>
+            <td><strong>begin(#name)</strong></td>
             <td>
             <p>Defines a starting pattern:</p>
 {% highlight java %}
@@ -827,7 +922,18 @@ Pattern<Event, ?> start = Pattern.<Event>begin("start");
             </td>
         </tr>
         <tr>
-            <td><strong>next()</strong></td>
+            <td><strong>begin(#pattern_sequence)</strong></td>
+            <td>
+            <p>Defines a starting pattern:</p>
+{% highlight java %}
+Pattern<Event, ?> start = Pattern.<Event>begin(
+    Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
+);
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>next(#name)</strong></td>
             <td>
                 <p>Appends a new pattern. A matching event has to directly succeed the previous matching event
                 (strict contiguity):</p>
@@ -837,7 +943,19 @@ Pattern<Event, ?> next = start.next("middle");
             </td>
         </tr>
         <tr>
-            <td><strong>followedBy()</strong></td>
+            <td><strong>next(#pattern_sequence)</strong></td>
+            <td>
+                <p>Appends a new pattern. A sequence of matching events have to directly succeed the previous matching event
+                (strict contiguity):</p>
+{% highlight java %}
+Pattern<Event, ?> next = start.next(
+    Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
+);
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>followedBy(#name)</strong></td>
             <td>
                 <p>Appends a new pattern. Other events can occur between a matching event and the previous
                 matching event (relaxed contiguity):</p>
@@ -847,7 +965,19 @@ Pattern<Event, ?> followedBy = start.followedBy("middle");
             </td>
         </tr>
         <tr>
-            <td><strong>followedByAny()</strong></td>
+            <td><strong>followedBy(#pattern_sequence)</strong></td>
+            <td>
+                 <p>Appends a new pattern. Other events can occur between a sequence of matching events and the previous
+                 matching event (relaxed contiguity):</p>
+{% highlight java %}
+Pattern<Event, ?> followedBy = start.followedBy(
+    Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
+);
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>followedByAny(#name)</strong></td>
             <td>
                 <p>Appends a new pattern. Other events can occur between a matching event and the previous
                 matching event, and alternative matches will be presented for every alternative matching event
@@ -855,7 +985,20 @@ Pattern<Event, ?> followedBy = start.followedBy("middle");
 {% highlight java %}
 Pattern<Event, ?> followedByAny = start.followedByAny("middle");
 {% endhighlight %}
-                    </td>
+             </td>
+        </tr>
+        <tr>
+             <td><strong>followedByAny(#pattern_sequence)</strong></td>
+             <td>
+                 <p>Appends a new pattern. Other events can occur between a sequence of matching events and the previous
+                 matching event, and alternative matches will be presented for every alternative sequence of matching events
+                 (non-deterministic relaxed contiguity):</p>
+{% highlight java %}
+Pattern<Event, ?> followedByAny = start.followedByAny(
+    Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
+);
+{% endhighlight %}
+             </td>
         </tr>
         <tr>
                     <td><strong>notNext()</strong></td>
@@ -911,7 +1054,7 @@ val start = Pattern.begin[Event]("start")
             </td>
         </tr>
         <tr>
-            <td><strong>next()</strong></td>
+            <td><strong>next(#name)</strong></td>
             <td>
                 <p>Appends a new pattern. A matching event has to directly succeed the previous matching event
                 (strict contiguity):</p>
@@ -921,7 +1064,19 @@ val next = start.next("middle")
             </td>
         </tr>
         <tr>
-            <td><strong>followedBy()</strong></td>
+            <td><strong>next(#pattern_sequence)</strong></td>
+            <td>
+                <p>Appends a new pattern. A sequence of matching events have to directly succeed the previous matching event
+                (strict contiguity):</p>
+{% highlight scala %}
+val next = start.next(
+    Pattern.begin[Event]("start").where(...).followedBy("middle").where(...)
+)
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>followedBy(#name)</strong></td>
             <td>
                 <p>Appends a new pattern. Other events can occur between a matching event and the previous
                 matching event (relaxed contiguity) :</p>
@@ -931,16 +1086,41 @@ val followedBy = start.followedBy("middle")
             </td>
         </tr>
         <tr>
-                    <td><strong>followedByAny()</strong></td>
-                    <td>
-                        <p>Appends a new pattern. Other events can occur between a matching event and the previous
-                        matching event, and alternative matches will be presented for every alternative matching event
-                        (non-deterministic relaxed contiguity):</p>
+            <td><strong>followedBy(#pattern_sequence)</strong></td>
+            <td>
+                <p>Appends a new pattern. Other events can occur between a sequence of matching events and the previous
+                matching event (relaxed contiguity) :</p>
 {% highlight scala %}
-val followedByAny = start.followedByAny("middle");
+val followedBy = start.followedBy(
+    Pattern.begin[Event]("start").where(...).followedBy("middle").where(...)
+)
 {% endhighlight %}
-                            </td>
-                </tr>
+            </td>
+        </tr>
+        <tr>
+            <td><strong>followedByAny(#name)</strong></td>
+            <td>
+                <p>Appends a new pattern. Other events can occur between a matching event and the previous
+                matching event, and alternative matches will be presented for every alternative matching event
+                (non-deterministic relaxed contiguity):</p>
+{% highlight scala %}
+val followedByAny = start.followedByAny("middle")
+{% endhighlight %}
+            </td>
+         </tr>
+         <tr>
+             <td><strong>followedByAny(#pattern_sequence)</strong></td>
+             <td>
+                 <p>Appends a new pattern. Other events can occur between a sequence of matching events and the previous
+                 matching event, and alternative matches will be presented for every alternative sequence of matching events
+                 (non-deterministic relaxed contiguity):</p>
+{% highlight scala %}
+val followedByAny = start.followedByAny(
+    Pattern.begin[Event]("start").where(...).followedBy("middle").where(...)
+)
+{% endhighlight %}
+             </td>
+         </tr>
 
                 <tr>
                                     <td><strong>notNext()</strong></td>

http://git-wip-us.apache.org/repos/asf/flink/blob/3096bd03/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/GroupPattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/GroupPattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/GroupPattern.scala
new file mode 100644
index 0000000..6210956
--- /dev/null
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/GroupPattern.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.cep.pattern.conditions.IterativeCondition
+import org.apache.flink.cep.pattern.{GroupPattern => JGroupPattern}
+
+/**
+  * Base class for a group pattern definition.
+  *
+  * @param jGroupPattern Underlying Java API GroupPattern
+  * @tparam T Base type of the elements appearing in the pattern
+  * @tparam F Subtype of T to which the current pattern operator is constrained
+  */
+class GroupPattern[T , F <: T](jGroupPattern: JGroupPattern[T, F])
+  extends Pattern[T , F](jGroupPattern) {
+
+  override def where(condition: IterativeCondition[F] ) =
+    throw new UnsupportedOperationException ("GroupPattern does not support where clause.")
+
+  override def or(condition: IterativeCondition[F] ) =
+    throw new UnsupportedOperationException ("GroupPattern does not support or clause.")
+
+  override def subtype[S <: F](clazz: Class[S]) =
+    throw new UnsupportedOperationException("GroupPattern does not support subtype clause.")
+
+}
+
+object GroupPattern {
+
+  /**
+    * Constructs a new GroupPattern by wrapping a given Java API GroupPattern
+    *
+    * @param jGroupPattern Underlying Java API GroupPattern.
+    * @tparam T Base type of the elements appearing in the pattern
+    * @tparam F Subtype of T to which the current pattern operator is constrained
+    * @return New wrapping GroupPattern object
+    */
+  def apply[T, F <: T](jGroupPattern: JGroupPattern[T, F]) = new GroupPattern[T, F](jGroupPattern)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3096bd03/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 644da5e..4393c99 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -418,6 +418,40 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     this
   }
 
+  /**
+    * Appends a new group pattern to the existing one. The new pattern enforces non-strict
+    * temporal contiguity. This means that a matching event of this pattern and the
+    * preceding matching event might be interleaved with other events which are ignored.
+    *
+    * @param pattern the pattern to append
+    * @return A new pattern which is appended to this one
+    */
+  def followedBy(pattern: Pattern[T, F]): GroupPattern[T, F] =
+    GroupPattern[T, F](jPattern.followedBy(pattern.wrappedPattern))
+
+  /**
+    * Appends a new group pattern to the existing one. The new pattern enforces non-strict
+    * temporal contiguity. This means that a matching event of this pattern and the
+    * preceding matching event might be interleaved with other events which are ignored.
+    *
+    * @param pattern the pattern to append
+    * @return A new pattern which is appended to this one
+    */
+  def followedByAny(pattern: Pattern[T, F]): GroupPattern[T, F] =
+    GroupPattern[T, F](jPattern.followedByAny(pattern.wrappedPattern))
+
+  /**
+    * Appends a new group pattern to the existing one. The new pattern enforces strict
+    * temporal contiguity. This means that the whole pattern sequence matches only
+    * if an event which matches this pattern directly follows the preceding matching
+    * event. Thus, there cannot be any events in between two matching events.
+    *
+    * @param pattern the pattern to append
+    * @return A new pattern which is appended to this one
+    */
+  def next(pattern: Pattern[T, F]): GroupPattern[T, F] =
+    GroupPattern[T, F](jPattern.next(pattern.wrappedPattern))
+
 }
 
 object Pattern {
@@ -442,4 +476,13 @@ object Pattern {
     */
   def begin[X](name: String): Pattern[X, X] = Pattern(JPattern.begin(name))
 
+  /**
+    * Starts a new pattern sequence. The provided pattern is the initial pattern
+    * of the new sequence.
+    *
+    * @param pattern the pattern to begin with
+    * @return the first pattern of a pattern sequence
+    */
+  def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F] =
+    GroupPattern[T, F](JPattern.begin(pattern.wrappedPattern))
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3096bd03/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index c28390e..e160e4a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -25,6 +25,7 @@ import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
 import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.pattern.GroupPattern;
 import org.apache.flink.cep.pattern.MalformedPatternException;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.Quantifier;
@@ -112,6 +113,8 @@ public class NFACompiler {
 		private final List<State<T>> states = new ArrayList<>();
 
 		private long windowTime = 0;
+		private GroupPattern<T, ?> currentGroupPattern;
+		private Map<GroupPattern<T, ?>, Boolean> firstOfLoopMap = new HashMap<>();
 		private Pattern<T, ?> currentPattern;
 		private Pattern<T, ?> followingPattern;
 
@@ -128,6 +131,8 @@ public class NFACompiler {
 				throw new MalformedPatternException("NotFollowedBy is not supported as a last part of a Pattern!");
 			}
 
+			checkPatternNameUniqueness();
+
 			// we're traversing the pattern from the end to the beginning --> the first state is the final state
 			State<T> sinkState = createEndingState();
 			// add all the normal states
@@ -145,6 +150,39 @@ public class NFACompiler {
 		}
 
 		/**
+		 * Check if there are duplicate pattern names. If yes, it
+		 * throws a {@link MalformedPatternException}.
+		 */
+		private void checkPatternNameUniqueness() {
+			// make sure there is no pattern with name "$endState$"
+			stateNameHandler.checkNameUniqueness(ENDING_STATE_NAME);
+			Pattern patternToCheck = currentPattern;
+			while (patternToCheck != null) {
+				checkPatternNameUniqueness(patternToCheck);
+				patternToCheck = patternToCheck.getPrevious();
+			}
+			stateNameHandler.clear();
+		}
+
+		/**
+		 * Check if the given pattern's name is already used or not. If yes, it
+		 * throws a {@link MalformedPatternException}.
+		 *
+		 * @param pattern The pattern to be checked
+		 */
+		private void checkPatternNameUniqueness(final Pattern pattern) {
+			if (pattern instanceof GroupPattern) {
+				Pattern patternToCheck = ((GroupPattern) pattern).getRawPattern();
+				while (patternToCheck != null) {
+					checkPatternNameUniqueness(patternToCheck);
+					patternToCheck = patternToCheck.getPrevious();
+				}
+			} else {
+				stateNameHandler.checkNameUniqueness(pattern.getName());
+			}
+		}
+
+		/**
 		 * Retrieves list of conditions resulting in Stop state and names of the corresponding NOT patterns.
 		 *
 		 * <p>A current not condition can be produced in two cases:
@@ -169,7 +207,7 @@ public class NFACompiler {
 				previousPattern = previousPattern.getPrevious();
 
 				if (previousPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
-					final IterativeCondition<T> notCondition = (IterativeCondition<T>) previousPattern.getCondition();
+					final IterativeCondition<T> notCondition = getTakeCondition(previousPattern);
 					notConditions.add(Tuple2.of(notCondition, previousPattern.getName()));
 				}
 			}
@@ -199,10 +237,8 @@ public class NFACompiler {
 				if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
 					//skip notFollow patterns, they are converted into edge conditions
 				} else if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_NEXT) {
-					stateNameHandler.checkNameUniqueness(currentPattern.getName());
-
 					final State<T> notNext = createState(currentPattern.getName(), State.StateType.Normal);
-					final IterativeCondition<T> notCondition = (IterativeCondition<T>) currentPattern.getCondition();
+					final IterativeCondition<T> notCondition = getTakeCondition(currentPattern);
 					final State<T> stopState = createStopState(notCondition, currentPattern.getName());
 
 					if (lastSink.isFinal()) {
@@ -214,7 +250,6 @@ public class NFACompiler {
 					notNext.addProceed(stopState, notCondition);
 					lastSink = notNext;
 				} else {
-					stateNameHandler.checkNameUniqueness(currentPattern.getName());
 					lastSink = convertPattern(lastSink);
 				}
 
@@ -239,7 +274,6 @@ public class NFACompiler {
 		 */
 		@SuppressWarnings("unchecked")
 		private State<T> createStartState(State<T> sinkState) {
-			stateNameHandler.checkNameUniqueness(currentPattern.getName());
 			final State<T> beginningState = convertPattern(sinkState);
 			beginningState.makeStart();
 			return beginningState;
@@ -252,9 +286,11 @@ public class NFACompiler {
 			if (quantifier.hasProperty(Quantifier.QuantifierProperty.LOOPING)) {
 
 				// if loop has started then all notPatterns previous to the optional states are no longer valid
+				setCurrentGroupPatternFirstOfLoop(false);
 				final State<T> sink = copyWithoutTransitiveNots(sinkState);
 				final State<T> looping = createLooping(sink);
 
+				setCurrentGroupPatternFirstOfLoop(true);
 				if (!quantifier.hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
 					lastSink = createInitMandatoryStateOfOneOrMore(looping);
 				} else {
@@ -360,7 +396,7 @@ public class NFACompiler {
 		private void addStopStateToLooping(final State<T> loopingState) {
 			if (followingPattern != null &&
 					followingPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
-				final IterativeCondition<T> notCondition = (IterativeCondition<T>) followingPattern.getCondition();
+				final IterativeCondition<T> notCondition = getTakeCondition(followingPattern);
 				final State<T> stopState = createStopState(notCondition, followingPattern.getName());
 				loopingState.addProceed(stopState, notCondition);
 			}
@@ -376,24 +412,76 @@ public class NFACompiler {
 		 */
 		private State<T> createTimesState(final State<T> sinkState, Times times) {
 			State<T> lastSink = sinkState;
+			setCurrentGroupPatternFirstOfLoop(false);
+			final IterativeCondition<T> takeCondition = getTakeCondition(currentPattern);
 			final IterativeCondition<T> innerIgnoreCondition = getInnerIgnoreCondition(currentPattern);
 			for (int i = times.getFrom(); i < times.getTo(); i++) {
-				lastSink = createSingletonState(lastSink, sinkState, innerIgnoreCondition, true);
+				lastSink = createSingletonState(lastSink, sinkState, takeCondition, innerIgnoreCondition, true);
 				addStopStateToLooping(lastSink);
 			}
 			for (int i = 0; i < times.getFrom() - 1; i++) {
-				lastSink = createSingletonState(lastSink, null, innerIgnoreCondition, false);
+				lastSink = createSingletonState(lastSink, null, takeCondition, innerIgnoreCondition, false);
 				addStopStateToLooping(lastSink);
 			}
 			// we created the intermediate states in the loop, now we create the start of the loop.
+			setCurrentGroupPatternFirstOfLoop(true);
 			return createSingletonState(
 				lastSink,
 				sinkState,
+				takeCondition,
 				getIgnoreCondition(currentPattern),
 				currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL));
 		}
 
 		/**
+		 * Marks the current group pattern as the head of the TIMES quantifier or not.
+		 *
+		 * @param isFirstOfLoop whether the current group pattern is the head of the TIMES quantifier
+		 */
+		@SuppressWarnings("unchecked")
+		private void setCurrentGroupPatternFirstOfLoop(boolean isFirstOfLoop) {
+			if (currentPattern instanceof GroupPattern) {
+				firstOfLoopMap.put((GroupPattern<T, ?>) currentPattern, isFirstOfLoop);
+			}
+		}
+
+		/**
+		 * Checks if the current group pattern is the head of the TIMES/LOOPING quantifier or not a
+		 * TIMES/LOOPING quantifier pattern.
+		 */
+		private boolean isCurrentGroupPatternFirstOfLoop() {
+			if (firstOfLoopMap.containsKey(currentGroupPattern)) {
+				return firstOfLoopMap.get(currentGroupPattern);
+			} else {
+				return true;
+			}
+		}
+
+		/**
+		 * Checks if the given pattern is the head pattern of the current group pattern.
+		 *
+		 * @param pattern the pattern to be checked
+		 * @return {@code true} iff the given pattern is in a group pattern and it is the head pattern of the
+		 * group pattern, {@code false} otherwise
+		 */
+		private boolean headOfGroup(Pattern<T, ?> pattern) {
+			return currentGroupPattern != null && pattern.getPrevious() == null;
+		}
+
+		/**
+		 * Checks if the given pattern is optional. If the given pattern is the head of a group pattern,
+		 * the optional status depends on the group pattern.
+		 */
+		private boolean isPatternOptional(Pattern<T, ?> pattern) {
+			if (headOfGroup(pattern)) {
+				return isCurrentGroupPatternFirstOfLoop() &&
+					currentGroupPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL);
+			} else {
+				return pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL);
+			}
+		}
+
+		/**
 		 * Creates a simple single state. For an OPTIONAL state it also consists
 		 * of a similar state without the PROCEED edge, so that for each PROCEED transition branches
 		 * in computation state graph  can be created only once.
@@ -406,8 +494,9 @@ public class NFACompiler {
 			return createSingletonState(
 				sinkState,
 				sinkState,
+				getTakeCondition(currentPattern),
 				getIgnoreCondition(currentPattern),
-				currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL));
+				isPatternOptional(currentPattern));
 		}
 
 		/**
@@ -424,17 +513,23 @@ public class NFACompiler {
 		@SuppressWarnings("unchecked")
 		private State<T> createSingletonState(final State<T> sinkState,
 			final State<T> proceedState,
+			final IterativeCondition<T> takeCondition,
 			final IterativeCondition<T> ignoreCondition,
 			final boolean isOptional) {
-			final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
-			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
+			if (currentPattern instanceof GroupPattern) {
+				return createGroupPatternState((GroupPattern) currentPattern, sinkState, proceedState, isOptional);
+			}
+
+			final IterativeCondition<T> trueFunction = getTrueFunction();
 
 			final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal);
 			// if event is accepted then all notPatterns previous to the optional states are no longer valid
 			final State<T> sink = copyWithoutTransitiveNots(sinkState);
-			singletonState.addTake(sink, currentCondition);
+			singletonState.addTake(sink, takeCondition);
 
-			if (isOptional) {
+			// for the first state of a group pattern, its PROCEED edge should point to the following state of
+			// that group pattern and the edge will be added at the end of creating the NFA for that group pattern
+			if (isOptional && !headOfGroup(currentPattern)) {
 				// if no element accepted the previous nots are still valid.
 				singletonState.addProceed(proceedState, trueFunction);
 			}
@@ -443,7 +538,7 @@ public class NFACompiler {
 				final State<T> ignoreState;
 				if (isOptional) {
 					ignoreState = createState(currentPattern.getName(), State.StateType.Normal);
-					ignoreState.addTake(sink, currentCondition);
+					ignoreState.addTake(sink, takeCondition);
 					ignoreState.addIgnore(ignoreCondition);
 					addStopStates(ignoreState);
 				} else {
@@ -455,6 +550,70 @@ public class NFACompiler {
 		}
 
 		/**
+		 * Create all the states for the group pattern.
+		 *
+		 * @param groupPattern the group pattern to create the states for
+		 * @param sinkState the state that the group pattern being converted should point to
+		 * @param proceedState the state that the group pattern being converted should proceed to
+		 * @param isOptional whether the group pattern being converted is optional
+		 * @return the first state of the states of the group pattern
+		 */
+		private State<T> createGroupPatternState(
+			final GroupPattern<T, ?> groupPattern,
+			final State<T> sinkState,
+			final State<T> proceedState,
+			final boolean isOptional) {
+			final IterativeCondition<T> trueFunction = getTrueFunction();
+
+			Pattern<T, ?> oldCurrentPattern = currentPattern;
+			Pattern<T, ?> oldFollowingPattern = followingPattern;
+			GroupPattern<T, ?> oldGroupPattern = currentGroupPattern;
+			State<T> lastSink = sinkState;
+			currentGroupPattern = groupPattern;
+			currentPattern = groupPattern.getRawPattern();
+			lastSink = createMiddleStates(lastSink);
+			lastSink = convertPattern(lastSink);
+			if (isOptional) {
+				// for the first state of a group pattern, its PROCEED edge should point to
+				// the following state of that group pattern
+				lastSink.addProceed(proceedState, trueFunction);
+			}
+			currentPattern = oldCurrentPattern;
+			followingPattern = oldFollowingPattern;
+			currentGroupPattern = oldGroupPattern;
+			return lastSink;
+		}
+
+		/**
+		 * Create the states for the group pattern as a looping one.
+		 *
+		 * @param groupPattern the group pattern to create the states for
+		 * @param sinkState the state that the group pattern being converted should point to
+		 * @return the first state of the states of the group pattern
+		 */
+		private State<T> createLoopingGroupPatternState(
+			final GroupPattern<T, ?> groupPattern,
+			final State<T> sinkState) {
+			final IterativeCondition<T> trueFunction = getTrueFunction();
+
+			Pattern<T, ?> oldCurrentPattern = currentPattern;
+			Pattern<T, ?> oldFollowingPattern = followingPattern;
+			GroupPattern<T, ?> oldGroupPattern = currentGroupPattern;
+			final State<T> dummyState = createState(currentPattern.getName(), State.StateType.Normal);
+			State<T> lastSink = dummyState;
+			currentGroupPattern = groupPattern;
+			currentPattern = groupPattern.getRawPattern();
+			lastSink = createMiddleStates(lastSink);
+			lastSink = convertPattern(lastSink);
+			lastSink.addProceed(sinkState, trueFunction);
+			dummyState.addProceed(lastSink, trueFunction);
+			currentPattern = oldCurrentPattern;
+			followingPattern = oldFollowingPattern;
+			currentGroupPattern = oldGroupPattern;
+			return lastSink;
+		}
+
+		/**
 		 * Creates the given state as a looping one. Looping state is one with TAKE edge to itself and
 		 * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that
 		 * for each PROCEED transition branches in computation state graph  can be created only once.
@@ -464,16 +623,19 @@ public class NFACompiler {
 		 */
 		@SuppressWarnings("unchecked")
 		private State<T> createLooping(final State<T> sinkState) {
+			if (currentPattern instanceof GroupPattern) {
+				return createLoopingGroupPatternState((GroupPattern) currentPattern, sinkState);
+			}
 			final IterativeCondition<T> untilCondition = (IterativeCondition<T>) currentPattern.getUntilCondition();
 
 			final IterativeCondition<T> ignoreCondition = extendWithUntilCondition(
 				getInnerIgnoreCondition(currentPattern),
 				untilCondition);
 			final IterativeCondition<T> takeCondition = extendWithUntilCondition(
-				(IterativeCondition<T>) currentPattern.getCondition(),
+				getTakeCondition(currentPattern),
 				untilCondition);
 
-			final IterativeCondition<T> proceedCondition = BooleanConditions.trueFunction();
+			final IterativeCondition<T> proceedCondition = getTrueFunction();
 			final State<T> loopingState = createState(currentPattern.getName(), State.StateType.Normal);
 			loopingState.addProceed(sinkState, proceedCondition);
 			loopingState.addTake(takeCondition);
@@ -501,18 +663,13 @@ public class NFACompiler {
 		@SuppressWarnings("unchecked")
 		private State<T> createInitMandatoryStateOfOneOrMore(final State<T> sinkState) {
 			final IterativeCondition<T> takeCondition = extendWithUntilCondition(
-				(IterativeCondition<T>) currentPattern.getCondition(),
+				getTakeCondition(currentPattern),
 				(IterativeCondition<T>) currentPattern.getUntilCondition()
 			);
 
-			final State<T> firstState = createState(currentPattern.getName(), State.StateType.Normal);
-			firstState.addTake(sinkState, takeCondition);
-
 			final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
-			if (ignoreCondition != null) {
-				firstState.addIgnore(ignoreCondition);
-			}
-			return firstState;
+
+			return createSingletonState(sinkState, null, takeCondition, ignoreCondition, false);
 		}
 
 		/**
@@ -525,24 +682,13 @@ public class NFACompiler {
 		@SuppressWarnings("unchecked")
 		private State<T> createInitOptionalStateOfZeroOrMore(final State<T> loopingState, final State<T> lastSink) {
 			final IterativeCondition<T> takeCondition = extendWithUntilCondition(
-				(IterativeCondition<T>) currentPattern.getCondition(),
+				getTakeCondition(currentPattern),
 				(IterativeCondition<T>) currentPattern.getUntilCondition()
 			);
 
-			final State<T> firstState = createState(currentPattern.getName(), State.StateType.Normal);
-			firstState.addProceed(lastSink, BooleanConditions.<T>trueFunction());
-			firstState.addTake(loopingState, takeCondition);
-
 			final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
-			if (ignoreFunction != null) {
-				final State<T> firstStateWithoutProceed = createState(currentPattern.getName(), State.StateType.Normal);
-				firstState.addIgnore(firstStateWithoutProceed, ignoreFunction);
-				firstStateWithoutProceed.addIgnore(ignoreFunction);
-				firstStateWithoutProceed.addTake(loopingState, takeCondition);
 
-				addStopStates(firstStateWithoutProceed);
-			}
-			return firstState;
+			return createSingletonState(loopingState, lastSink, takeCondition, ignoreFunction, true);
 		}
 
 		/**
@@ -567,37 +713,107 @@ public class NFACompiler {
 
 		/**
 		 * @return The {@link IterativeCondition condition} for the {@code IGNORE} edge
-		 * that corresponds to the specified {@link Pattern}. It is applicable only for inner states of a complex
-		 * state like looping or times.
+		 * that corresponds to the specified {@link Pattern} and extended with stop(until) condition
+		 * if necessary. It is applicable only for inner states of a complex state like looping or times.
 		 */
 		@SuppressWarnings("unchecked")
 		private IterativeCondition<T> getInnerIgnoreCondition(Pattern<T, ?> pattern) {
-			switch (pattern.getQuantifier().getInnerConsumingStrategy()) {
+			Quantifier.ConsumingStrategy consumingStrategy = pattern.getQuantifier().getInnerConsumingStrategy();
+			if (headOfGroup(pattern)) {
+				// for the head pattern of a group pattern, we should consider the
+				// inner consume strategy of the group pattern
+				consumingStrategy = currentGroupPattern.getQuantifier().getInnerConsumingStrategy();
+			}
+
+			IterativeCondition<T> innerIgnoreCondition = null;
+			switch (consumingStrategy) {
 				case STRICT:
-					return null;
+					innerIgnoreCondition = null;
+					break;
 				case SKIP_TILL_NEXT:
-					return new NotCondition<>((IterativeCondition<T>) pattern.getCondition());
+					innerIgnoreCondition = new NotCondition<>((IterativeCondition<T>) pattern.getCondition());
+					break;
 				case SKIP_TILL_ANY:
-					return BooleanConditions.trueFunction();
+					innerIgnoreCondition = BooleanConditions.trueFunction();
+					break;
+			}
+
+			if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
+				innerIgnoreCondition = extendWithUntilCondition(
+					innerIgnoreCondition,
+					(IterativeCondition<T>) currentGroupPattern.getUntilCondition());
 			}
-			return null;
+			return innerIgnoreCondition;
 		}
 
 		/**
 		 * @return The {@link IterativeCondition condition} for the {@code IGNORE} edge
-		 * that corresponds to the specified {@link Pattern}. For more on strategy see {@link Quantifier}
+		 * that corresponds to the specified {@link Pattern} and extended with
+		 * stop(until) condition if necessary. For more on strategy see {@link Quantifier}
 		 */
 		@SuppressWarnings("unchecked")
 		private IterativeCondition<T> getIgnoreCondition(Pattern<T, ?> pattern) {
-			switch (pattern.getQuantifier().getConsumingStrategy()) {
+			Quantifier.ConsumingStrategy consumingStrategy = pattern.getQuantifier().getConsumingStrategy();
+			if (headOfGroup(pattern)) {
+				// for the head pattern of a group pattern, we should consider the inner consume strategy
+				// of the group pattern if the group pattern is not the head of the TIMES/LOOPING quantifier;
+				// otherwise, we should consider the consume strategy of the group pattern
+				if (isCurrentGroupPatternFirstOfLoop()) {
+					consumingStrategy = currentGroupPattern.getQuantifier().getConsumingStrategy();
+				} else {
+					consumingStrategy = currentGroupPattern.getQuantifier().getInnerConsumingStrategy();
+				}
+			}
+
+			IterativeCondition<T> ignoreCondition = null;
+			switch (consumingStrategy) {
 				case STRICT:
-					return null;
+					ignoreCondition = null;
+					break;
 				case SKIP_TILL_NEXT:
-					return new NotCondition<>((IterativeCondition<T>) pattern.getCondition());
+					ignoreCondition = new NotCondition<>((IterativeCondition<T>) pattern.getCondition());
+					break;
 				case SKIP_TILL_ANY:
-					return BooleanConditions.trueFunction();
+					ignoreCondition = BooleanConditions.trueFunction();
+					break;
+			}
+
+			if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
+				ignoreCondition = extendWithUntilCondition(
+					ignoreCondition,
+					(IterativeCondition<T>) currentGroupPattern.getUntilCondition());
+			}
+			return ignoreCondition;
+		}
+
+		/**
+		 * @return the {@link IterativeCondition condition} for the {@code TAKE} edge
+		 * that corresponds to the specified {@link Pattern} and extended with
+		 * stop(until) condition if necessary.
+		 */
+		@SuppressWarnings("unchecked")
+		private IterativeCondition<T> getTakeCondition(Pattern<T, ?> pattern) {
+			IterativeCondition<T> takeCondition = (IterativeCondition<T>) pattern.getCondition();
+			if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
+				takeCondition = extendWithUntilCondition(
+					takeCondition,
+					(IterativeCondition<T>) currentGroupPattern.getUntilCondition());
+			}
+			return takeCondition;
+		}
+
+		/**
+		 * @return An true function extended with stop(until) condition if necessary.
+		 */
+		@SuppressWarnings("unchecked")
+		private IterativeCondition<T> getTrueFunction() {
+			IterativeCondition<T> trueCondition = BooleanConditions.trueFunction();
+			if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
+				trueCondition = extendWithUntilCondition(
+					trueCondition,
+					(IterativeCondition<T>) currentGroupPattern.getUntilCondition());
 			}
-			return null;
+			return trueCondition;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3096bd03/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
index 558b6f4..9766bff 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
@@ -55,6 +55,14 @@ public class NFAStateNameHandler {
 		if (usedNames.contains(name)) {
 			throw new MalformedPatternException("Duplicate pattern name: " + name + ". Names must be unique.");
 		}
+		usedNames.add(name);
+	}
+
+	/**
+	 * Clear the names added during checking name uniqueness.
+	 */
+	public void clear() {
+		usedNames.clear();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/3096bd03/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/GroupPattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/GroupPattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/GroupPattern.java
new file mode 100644
index 0000000..a20d377
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/GroupPattern.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+
+/**
+ * Base class for a group pattern definition.
+ *
+ * @param <T> Base type of the elements appearing in the pattern
+ * @param <F> Subtype of T to which the current pattern operator is constrained
+ */
+public class GroupPattern<T, F extends T> extends Pattern<T, F> {
+
+	/** Group pattern representing the pattern definition of this group. */
+	private final Pattern<T, ? extends T> groupPattern;
+
+	GroupPattern(final Pattern<T, ? extends T> previous, final Pattern<T, ? extends T> groupPattern) {
+		super("GroupPattern", previous);
+		this.groupPattern = groupPattern;
+	}
+
+	GroupPattern(
+		final Pattern<T, ? extends T> previous,
+		final Pattern<T, ? extends T> groupPattern,
+		final Quantifier.ConsumingStrategy consumingStrategy) {
+		super("GroupPattern", previous, consumingStrategy);
+		this.groupPattern = groupPattern;
+	}
+
+	@Override
+	public Pattern<T, F> where(IterativeCondition<F> condition) {
+		throw new UnsupportedOperationException("GroupPattern does not support where clause.");
+	}
+
+	@Override
+	public Pattern<T, F> or(IterativeCondition<F> condition) {
+		throw new UnsupportedOperationException("GroupPattern does not support or clause.");
+	}
+
+	@Override
+	public <S extends F> Pattern<T, S> subtype(final Class<S> subtypeClass) {
+		throw new UnsupportedOperationException("GroupPattern does not support subtype clause.");
+	}
+
+	public Pattern<T, ? extends T> getRawPattern() {
+		return groupPattern;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3096bd03/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 1131318..f4d3404 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -430,6 +430,54 @@ public class Pattern<T, F extends T> {
 		return this;
 	}
 
+	/**
+	 * Starts a new pattern sequence. The provided pattern is the initial pattern
+	 * of the new sequence.
+	 *
+	 * @param group the pattern to begin with
+	 * @return the first pattern of a pattern sequence
+	 */
+	public static <T, F extends T> GroupPattern<T, F> begin(Pattern<T, F> group) {
+		return new GroupPattern<>(null, group);
+	}
+
+	/**
+	 * Appends a new group pattern to the existing one. The new pattern enforces non-strict
+	 * temporal contiguity. This means that a matching event of this pattern and the
+	 * preceding matching event might be interleaved with other events which are ignored.
+	 *
+	 * @param group the pattern to append
+	 * @return A new pattern which is appended to this one
+	 */
+	public GroupPattern<T, F> followedBy(Pattern<T, F> group) {
+		return new GroupPattern<>(this, group, ConsumingStrategy.SKIP_TILL_NEXT);
+	}
+
+	/**
+	 * Appends a new group pattern to the existing one. The new pattern enforces non-strict
+	 * temporal contiguity. This means that a matching event of this pattern and the
+	 * preceding matching event might be interleaved with other events which are ignored.
+	 *
+	 * @param group the pattern to append
+	 * @return A new pattern which is appended to this one
+	 */
+	public GroupPattern<T, F> followedByAny(Pattern<T, F> group) {
+		return new GroupPattern<>(this, group, ConsumingStrategy.SKIP_TILL_ANY);
+	}
+
+	/**
+	 * Appends a new group pattern to the existing one. The new pattern enforces strict
+	 * temporal contiguity. This means that the whole pattern sequence matches only
+	 * if an event which matches this pattern directly follows the preceding matching
+	 * event. Thus, there cannot be any events in between two matching events.
+	 *
+	 * @param group the pattern to append
+	 * @return A new pattern which is appended to this one
+	 */
+	public GroupPattern<T, F> next(Pattern<T, F> group) {
+		return new GroupPattern<>(this, group, ConsumingStrategy.STRICT);
+	}
+
 	private void checkIfNoNotPattern() {
 		if (quantifier.getConsumingStrategy() == ConsumingStrategy.NOT_FOLLOW ||
 				quantifier.getConsumingStrategy() == ConsumingStrategy.NOT_NEXT) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3096bd03/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index 504fec0..c1893b4 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -67,14 +67,14 @@ public class Quantifier {
 		return properties.contains(property);
 	}
 
-	public ConsumingStrategy getConsumingStrategy() {
-		return consumingStrategy;
-	}
-
 	public ConsumingStrategy getInnerConsumingStrategy() {
 		return innerConsumingStrategy;
 	}
 
+	public ConsumingStrategy getConsumingStrategy() {
+		return consumingStrategy;
+	}
+
 	private static void checkPattern(boolean condition, Object errorMessage) {
 		if (!condition) {
 			throw new MalformedPatternException(String.valueOf(errorMessage));


Mime
View raw message