flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/2] flink git commit: [FLINK-3317] [cep] Introduce timeout handler to CEP operator
Date Thu, 16 Jun 2016 14:49:29 GMT
[FLINK-3317] [cep] Introduce timeout handler to CEP operator

Introduce timeout handling flag for the NFACompiler

Expose timeout handling via Java API

Update documentation of PatternStream and CEP

Introduce timeout select function to CEP Scala API

Add select and flatSelect with timeout support to CEP Scala API

Add test cases for timeout handling

Update documentation

Fix CEP Scala API completeness test

This closes #2041.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57ef6c31
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57ef6c31
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57ef6c31

Branch: refs/heads/master
Commit: 57ef6c315ee7aa467d922dd4d1213dfd8bc74fb0
Parents: c78b3c4
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu May 26 11:34:16 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Jun 16 16:48:17 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/libs/cep.md                 |  68 +++-
 .../org/apache/flink/cep/CEPLambdaTest.java     |  19 +-
 .../apache/flink/cep/scala/PatternStream.scala  | 217 +++++++++++-
 ...StreamScalaJavaAPIInteroperabilityTest.scala | 130 +++++++
 ...nStreamScalaJavaAPIInteroperabiliyTest.scala |  87 -----
 .../src/main/java/org/apache/flink/cep/CEP.java |  65 +---
 .../flink/cep/PatternFlatTimeoutFunction.java   |  56 +++
 .../org/apache/flink/cep/PatternStream.java     | 236 ++++++++++++-
 .../flink/cep/PatternTimeoutFunction.java       |  56 +++
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  60 ++--
 .../flink/cep/nfa/compiler/NFACompiler.java     |  32 +-
 .../AbstractCEPBasePatternOperator.java         |  92 +++++
 .../operator/AbstractCEPPatternOperator.java    | 146 +++++---
 .../AbstractKeyedCEPPatternOperator.java        | 337 +++++++++++++++++++
 .../flink/cep/operator/CEPOperatorUtils.java    | 143 ++++++++
 .../flink/cep/operator/CEPPatternOperator.java  | 123 ++-----
 .../cep/operator/KeyedCEPPatternOperator.java   | 319 ++----------------
 .../cep/operator/TimeoutCEPPatternOperator.java |  79 +++++
 .../TimeoutKeyedCEPPatternOperator.java         |  72 ++++
 .../java/org/apache/flink/cep/CEPITCase.java    |  85 +++++
 .../org/apache/flink/cep/nfa/NFAITCase.java     |  94 +++++-
 .../java/org/apache/flink/cep/nfa/NFATest.java  |   8 +-
 .../flink/cep/nfa/compiler/NFACompilerTest.java |   4 +-
 .../flink/cep/operator/CEPOperatorTest.java     |   3 +-
 24 files changed, 1861 insertions(+), 670 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/docs/apis/streaming/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/libs/cep.md b/docs/apis/streaming/libs/cep.md
index d76aab7..d465519 100644
--- a/docs/apis/streaming/libs/cep.md
+++ b/docs/apis/streaming/libs/cep.md
@@ -435,7 +435,7 @@ class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<
 </div>
 
 <div data-lang="scala" markdown="1">
-The `select` method takes a section function as argument, which is called for each matching event sequence.
+The `select` method takes a selection function as argument, which is called for each matching event sequence.
 It receives a map of string/event pairs of the matched events.
 The string is defined by the name of the state to which the event has been matched.
 The selection function returns exactly one result per call.
@@ -463,6 +463,72 @@ def flatSelectFn(pattern : mutable.Map[String, IN], collector : Collector[OUT])
 </div>
 </div>
 
+### Handling Timed Out Partial Patterns
+
+Whenever a pattern has a window length associated via the `within` key word, it is possible that partial event patterns will be discarded because they exceed the window length.
+In order to react to these timeout events the `select` and `flatSelect` API calls allow to specify a timeout handler.
+This timeout handler is called for each partial event pattern which has timed out.
+The timeout handler receives all so far matched events of the partial pattern and the timestamp when the timeout was detected.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+In order to treat partial patterns, the `select` and `flatSelect` API calls offer an overloaded version which takes as the first parameter a `PatternTimeoutFunction`/`PatternFlatTimeoutFunction` and as second parameter the known `PatternSelectFunction`/`PatternFlatSelectFunction`.
+The return type of the timeout function can be different from the select function.
+The timeout event and the select event are wrapped in `Either.Left` and `Either.Right` respectively so that the resulting data stream is of type `org.apache.flink.types.Either`.
+
+{% highlight java %}
+PatternStream<Event> patternStream = CEP.pattern(input, pattern);
+
+DataStream<Either<TimeoutEvent, ComplexEvent>> result = patternStream.select(
+    new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
+    new PatternSelectFunction<Event, ComplexEvent>() {...}
+);
+
+DataStream<Either<TimeoutEvent, ComplexEvent>> flatResult = patternStream.flatSelect(
+    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
+    new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
+);
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala" markdown="1">
+In order to treat partial patterns, the `select` API call offers an overloaded version which takes as the first parameter a timeout function and as second parameter a selection function.
+The timeout function is called with a map of string-event pairs of the partial match which has timed out and a long indicating when the timeout occurred.
+The string is defined by the name of the state to which the event has been matched.
+The timeout function returns exactly one result per call.
+The return type of the timeout function can be different from the select function.
+The timeout event and the select event are wrapped in `Left` and `Right` respectively so that the resulting data stream is of type `Either`.
+
+{% highlight scala %}
+val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
+
+DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.select{
+    (pattern: mutable.Map[String, Event], timestamp: Long) => TimeoutEvent()
+} {
+    pattern: mutable.Map[String, Event] => ComplexEvent()
+}
+{% endhighlight %}
+
+The `flatSelect` API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function.
+In contrast to the `select` functions, the `flatSelect` functions are called with an `Collector`.
+The collector can be used to emit an arbitrary number of events.
+
+{% highlight scala %}
+val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
+
+DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect{
+    (pattern: mutable.Map[String, Event], timestamp: Long, out: Collector[TimeoutEvent]) => 
+        out.collect(TimeoutEvent())
+} {
+    (pattern: mutable.Map[String, Event], out: Collector[ComplexEvent]) => 
+        out.collect(ComplexEvent())
+}
+{% endhighlight %}
+
+</div>
+</div>
+
 ## Examples
 
 The following example detects the pattern `start, middle(name = "error") -> end(name = "critical")` on a keyed data stream of `Events`.

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
index 2e6fcd9..5957158 100644
--- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.SourceTransformation;
@@ -44,18 +45,18 @@ public class CEPLambdaTest extends TestLogger {
 		TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
 		TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
 
-		TypeInformation<Map<String, EventA>> inputTpeInformation = (TypeInformation<Map<String, EventA>>) (TypeInformation<?>) TypeInformation.of(Map.class);
 
-		DataStream<Map<String, EventA>> inputStream = new DataStream<>(
+		DataStream<EventA> inputStream = new DataStream<>(
 			StreamExecutionEnvironment.getExecutionEnvironment(),
 			new SourceTransformation<>(
 				"source",
 				null,
-				inputTpeInformation,
+				eventTypeInformation,
 				1));
 
+		Pattern<EventA, ?> dummyPattern = Pattern.begin("start");
 
-		PatternStream<EventA> patternStream = new PatternStream<>(inputStream, eventTypeInformation);
+		PatternStream<EventA> patternStream = new PatternStream<>(inputStream, dummyPattern);
 
 		DataStream<EventB> result = patternStream.select(
 			map -> new EventB()
@@ -72,17 +73,17 @@ public class CEPLambdaTest extends TestLogger {
 		TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
 		TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
 
-		TypeInformation<Map<String, EventA>> inputTpeInformation = (TypeInformation<Map<String, EventA>>) (TypeInformation<?>) TypeInformation.of(Map.class);
-
-		DataStream<Map<String, EventA>> inputStream = new DataStream<>(
+		DataStream<EventA> inputStream = new DataStream<>(
 			StreamExecutionEnvironment.getExecutionEnvironment(),
 			new SourceTransformation<>(
 				"source",
 				null,
-				inputTpeInformation,
+				eventTypeInformation,
 				1));
 
-		PatternStream<EventA> patternStream = new PatternStream<>(inputStream, eventTypeInformation);
+		Pattern<EventA, ?> dummyPattern = Pattern.begin("start");
+
+		PatternStream<EventA> patternStream = new PatternStream<>(inputStream, dummyPattern);
 
 		DataStream<EventB> result = patternStream.flatSelect(
 			(map, collector) -> collector.collect(new EventB())

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index 22b105c..6207049 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -18,12 +18,19 @@
 package org.apache.flink.cep.scala
 
 import java.util.{Map => JMap}
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction,
-PatternStream => JPatternStream}
-import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.streaming.api.scala.asScalaStream
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.scala._
 import org.apache.flink.util.Collector
+import org.apache.flink.types.{Either => FEither}
+import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
+import java.lang.{Long => JLong}
+
+import org.apache.flink.cep.operator.CEPOperatorUtils
+import org.apache.flink.cep.scala.pattern.Pattern
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
@@ -40,6 +47,10 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
   private[flink] def wrappedPatternStream = jPatternStream
 
+  def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]])
+
+  def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream())
+
   /**
     * Applies a select function to the detected pattern sequence. For each pattern sequence the
     * provided [[PatternSelectFunction]] is called. The pattern select function can produce
@@ -56,6 +67,55 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
   }
 
   /**
+    * Applies a select function to the detected pattern sequence. For each pattern sequence the
+    * provided [[PatternSelectFunction]] is called. The pattern select function can produce
+    * exactly one resulting element.
+    *
+    * Additionally a timeout function is applied to partial event patterns which have timed out. For
+    * each partial pattern sequence the provided [[PatternTimeoutFunction]] is called. The pattern
+    * timeout function has to produce exactly one resulting timeout event.
+    *
+    * The resulting event and the resulting timeout event are wrapped in an [[Either]] instance.
+    *
+    * @param patternTimeoutFunction The pattern timeout function which is called for each partial
+    *                               pattern sequence which has timed out.
+    * @param patternSelectFunction  The pattern select function which is called for each detected
+    *                               pattern sequence.
+    * @tparam L Type of the resulting timeout event
+    * @tparam R Type of the resulting event
+    * @return Data stream of either type which contains the resulting events and resulting timeout
+    *         events.
+    */
+  def select[L: TypeInformation, R: TypeInformation](
+    patternTimeoutFunction: PatternTimeoutFunction[T, L],
+    patternSelectFunction: PatternSelectFunction[T, R])
+  : DataStream[Either[L, R]] = {
+
+    val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
+      jPatternStream.getInputStream(),
+      jPatternStream.getPattern())
+
+    val cleanedSelect = cleanClosure(patternSelectFunction)
+    val cleanedTimeout = cleanClosure(patternTimeoutFunction)
+
+    implicit val eitherTypeInfo = createTypeInformation[Either[L, R]]
+
+    asScalaStream(patternStream).map[Either[L, R]] {
+     input: FEither[FTuple2[JMap[String, T], JLong], JMap[String, T]] =>
+       if (input.isLeft) {
+         val timeout = input.left()
+         val timeoutEvent = cleanedTimeout.timeout(timeout.f0, timeout.f1)
+         val t = Left[L, R](timeoutEvent)
+         t
+       } else {
+         val event = cleanedSelect.select(input.right())
+         val t = Right[L, R](event)
+         t
+       }
+    }
+  }
+
+  /**
     * Applies a flat select function to the detected pattern sequence. For each pattern sequence
     * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can
     * produce an arbitrary number of resulting elements.
@@ -69,7 +129,63 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
   def flatSelect[R: TypeInformation](patternFlatSelectFunction: PatternFlatSelectFunction[T, R])
   : DataStream[R] = {
     asScalaStream(jPatternStream
-      .flatSelect(patternFlatSelectFunction, implicitly[TypeInformation[R]]))
+                    .flatSelect(patternFlatSelectFunction, implicitly[TypeInformation[R]]))
+  }
+
+  /**
+    * Applies a flat select function to the detected pattern sequence. For each pattern sequence
+    * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can
+    * produce an arbitrary number of resulting elements.
+    *
+    * Additionally a timeout function is applied to partial event patterns which have timed out. For
+    * each partial pattern sequence the provided [[PatternFlatTimeoutFunction]] is called. The
+    * pattern timeout function can produce an arbitrary number of resulting timeout events.
+    *
+    * The resulting event and the resulting timeout event are wrapped in an [[Either]] instance.
+    *
+    * @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each
+    *                                   partially matched pattern sequence which has timed out.
+    * @param patternFlatSelectFunction  The pattern flat select function which is called for each
+    *                                   detected pattern sequence.
+    * @tparam L Type of the resulting timeout event
+    * @tparam R Type of the resulting event
+    * @return Data stream of either type which contains the resulting events and the resulting
+    *         timeout events wrapped in a [[Either]] type.
+    */
+  def flatSelect[L: TypeInformation, R: TypeInformation](
+    patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L],
+    patternFlatSelectFunction: PatternFlatSelectFunction[T, R])
+  : DataStream[Either[L, R]] = {
+    val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
+      jPatternStream.getInputStream(),
+      jPatternStream.getPattern()
+    )
+
+    val cleanedSelect = cleanClosure(patternFlatSelectFunction)
+    val cleanedTimeout = cleanClosure(patternFlatTimeoutFunction)
+
+    implicit val eitherTypeInfo = createTypeInformation[Either[L, R]]
+
+    asScalaStream(patternStream).flatMap[Either[L, R]] {
+      (input: FEither[FTuple2[JMap[String, T], JLong], JMap[String, T]],
+        collector: Collector[Either[L, R]]) =>
+
+        if (input.isLeft()) {
+          val timeout = input.left()
+
+          cleanedTimeout.timeout(timeout.f0, timeout.f1, new Collector[L]() {
+            override def collect(record: L): Unit = collector.collect(Left(record))
+
+            override def close(): Unit = collector.close()
+          })
+        } else {
+          cleanedSelect.flatSelect(input.right, new Collector[R]() {
+            override def collect(record: R): Unit = collector.collect(Right(record))
+
+            override def close(): Unit = collector.close()
+          })
+        }
+    }
   }
 
   /**
@@ -83,8 +199,9 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
     * @return [[DataStream]] which contains the resulting elements from the pattern select function.
     */
   def select[R: TypeInformation](patternSelectFun: mutable.Map[String, T] => R): DataStream[R] = {
+    val cleanFun = cleanClosure(patternSelectFun)
+
     val patternSelectFunction: PatternSelectFunction[T, R] = new PatternSelectFunction[T, R] {
-      val cleanFun = cleanClosure(patternSelectFun)
 
       def select(in: JMap[String, T]): R = cleanFun(in.asScala)
     }
@@ -92,6 +209,46 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
   }
 
   /**
+    * Applies a select function to the detected pattern sequence. For each pattern sequence the
+    * provided [[PatternSelectFunction]] is called. The pattern select function can produce
+    * exactly one resulting element.
+    *
+    * Additionally a timeout function is applied to partial event patterns which have timed out. For
+    * each partial pattern sequence the provided [[PatternTimeoutFunction]] is called. The pattern
+    * timeout function has to produce exactly one resulting element.
+    *
+    * The resulting event and the resulting timeout event are wrapped in an [[Either]] instance.
+    *
+    * @param patternTimeoutFunction The pattern timeout function which is called for each partial
+    *                               pattern sequence which has timed out.
+    * @param patternSelectFunction  The pattern select function which is called for each detected
+    *                               pattern sequence.
+    * @tparam L Type of the resulting timeout event
+    * @tparam R Type of the resulting event
+    * @return Data stream of either type which contain the resulting events and resulting timeout
+    *         events.
+    */
+  def select[L: TypeInformation, R: TypeInformation](
+      patternTimeoutFunction: (mutable.Map[String, T], Long) => L) (
+      patternSelectFunction: mutable.Map[String, T] => R)
+    : DataStream[Either[L, R]] = {
+
+    val cleanSelectFun = cleanClosure(patternSelectFunction)
+    val cleanTimeoutFun = cleanClosure(patternTimeoutFunction)
+
+    val patternSelectFun = new PatternSelectFunction[T, R] {
+      override def select(pattern: JMap[String, T]): R = cleanSelectFun(pattern.asScala)
+    }
+    val patternTimeoutFun = new PatternTimeoutFunction[T, L] {
+      override def timeout(pattern: JMap[String, T], timeoutTimestamp: Long): L = {
+        cleanTimeoutFun(pattern.asScala, timeoutTimestamp)
+      }
+    }
+
+    select(patternTimeoutFun, patternSelectFun)
+  }
+
+  /**
     * Applies a flat select function to the detected pattern sequence. For each pattern sequence
     * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function
     * can produce an arbitrary number of resulting elements.
@@ -104,9 +261,10 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
     */
   def flatSelect[R: TypeInformation](patternFlatSelectFun: (mutable.Map[String, T],
     Collector[R]) => Unit): DataStream[R] = {
+    val cleanFun = cleanClosure(patternFlatSelectFun)
+
     val patternFlatSelectFunction: PatternFlatSelectFunction[T, R] =
       new PatternFlatSelectFunction[T, R] {
-        val cleanFun = cleanClosure(patternFlatSelectFun)
 
         def flatSelect(pattern: JMap[String, T], out: Collector[R]): Unit =
           cleanFun(pattern.asScala, out)
@@ -114,6 +272,51 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
     flatSelect(patternFlatSelectFunction)
   }
 
+  /**
+    * Applies a flat select function to the detected pattern sequence. For each pattern sequence
+    * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can
+    * produce an arbitrary number of resulting elements.
+    *
+    * Additionally a timeout function is applied to partial event patterns which have timed out. For
+    * each partial pattern sequence the provided [[PatternFlatTimeoutFunction]] is called. The
+    * pattern timeout function can produce an arbitrary number of resulting timeout events.
+    *
+    * The resulting event and the resulting timeout event are wrapped in an [[Either]] instance.
+    *
+    * @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each
+    *                                   partially matched pattern sequence which has timed out.
+    * @param patternFlatSelectFunction  The pattern flat select function which is called for each
+    *                                   detected pattern sequence.
+    * @tparam L Type of the resulting timeout event
+    * @tparam R Type of the resulting event
+    * @return Data stream of either type which contains the resulting events and the resulting
+    *         timeout events wrapped in a [[Either]] type.
+    */
+  def flatSelect[L: TypeInformation, R: TypeInformation](
+      patternFlatTimeoutFunction: (mutable.Map[String, T], Long, Collector[L]) => Unit) (
+      patternFlatSelectFunction: (mutable.Map[String, T], Collector[R]) => Unit)
+    : DataStream[Either[L, R]] = {
+
+    val cleanSelectFun = cleanClosure(patternFlatSelectFunction)
+    val cleanTimeoutFun = cleanClosure(patternFlatTimeoutFunction)
+
+    val patternFlatSelectFun = new PatternFlatSelectFunction[T, R] {
+      override def flatSelect(pattern: JMap[String, T], out: Collector[R]): Unit = {
+        cleanSelectFun(pattern.asScala, out)
+      }
+    }
+
+    val patternFlatTimeoutFun = new PatternFlatTimeoutFunction[T, L] {
+      override def timeout(
+        pattern: JMap[String, T],
+        timeoutTimestamp: Long, out: Collector[L])
+      : Unit = {
+        cleanTimeoutFun(pattern.asScala, timeoutTimestamp, out)
+      }
+    }
+
+    flatSelect(patternFlatTimeoutFun, patternFlatSelectFun)
+  }
 }
 
 object PatternStream {

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
new file mode 100644
index 0000000..6fe68c8
--- /dev/null
+++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import org.apache.flink.api.common.functions.util.ListCollector
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.streaming.api.operators.{StreamFlatMap, StreamMap}
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.util.{Collector, TestLogger}
+import org.apache.flink.types.{Either => FEither}
+import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
+
+import java.lang.{Long => JLong}
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import org.junit.Assert._
+import org.junit.Test
+
+class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
+
+  @Test
+  @throws[Exception]
+  def testScalaJavaAPISelectFunForwarding {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+    val dummyDataStream: DataStream[(Int, Int)] = env.fromElements()
+    val pattern: Pattern[(Int, Int), _] = Pattern.begin[(Int, Int)]("dummy")
+    val pStream: PatternStream[(Int, Int)] = CEP.pattern(dummyDataStream, pattern)
+    val param = mutable.Map("begin" ->(1, 2)).asJava
+    val result: DataStream[(Int, Int)] = pStream
+      .select((pattern: mutable.Map[String, (Int, Int)]) => {
+        //verifies input parameter forwarding
+        assertEquals(param, pattern.asJava)
+        param.get("begin")
+      })
+    val out = extractUserFunction[StreamMap[java.util.Map[String, (Int, Int)], (Int, Int)]](result)
+      .getUserFunction.map(param)
+    //verifies output parameter forwarding
+    assertEquals(param.get("begin"), out)
+  }
+
+  @Test
+  @throws[Exception]
+  def testScalaJavaAPIFlatSelectFunForwarding {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+    val dummyDataStream: DataStream[List[Int]] = env.fromElements()
+    val pattern: Pattern[List[Int], _] = Pattern.begin[List[Int]]("dummy")
+    val pStream: PatternStream[List[Int]] = CEP.pattern(dummyDataStream, pattern)
+    val inList = List(1, 2, 3)
+    val inParam = mutable.Map("begin" -> inList).asJava
+    val outList = new java.util.ArrayList[List[Int]]
+    val outParam = new ListCollector[List[Int]](outList)
+
+    val result: DataStream[List[Int]] = pStream
+
+      .flatSelect((pattern: mutable.Map[String, List[Int]], out: Collector[List[Int]]) => {
+        //verifies input parameter forwarding
+        assertEquals(inParam, pattern.asJava)
+        out.collect(pattern.get("begin").get)
+      })
+
+    extractUserFunction[StreamFlatMap[java.util.Map[String, List[Int]], List[Int]]](result).
+      getUserFunction.flatMap(inParam, outParam)
+    //verify output parameter forwarding and that flatMap function was actually called
+    assertEquals(inList, outList.get(0))
+  }
+
+  @Test
+  @throws[Exception]
+  def testTimeoutHandling: Unit = {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+    val dummyDataStream: DataStream[String] = env.fromElements()
+    val pattern: Pattern[String, _] = Pattern.begin[String]("dummy")
+    val pStream: PatternStream[String] = CEP.pattern(dummyDataStream, pattern)
+    val inParam = mutable.Map("begin" -> "barfoo").asJava
+    val outList = new java.util.ArrayList[Either[String, String]]
+    val output = new ListCollector[Either[String, String]](outList)
+    val expectedOutput = List(Right("match"), Right("barfoo"), Left("timeout"), Left("barfoo"))
+      .asJava
+
+    val result: DataStream[Either[String, String]] = pStream.flatSelect {
+        (pattern: mutable.Map[String, String], timestamp: Long, out: Collector[String]) =>
+          out.collect("timeout")
+          out.collect(pattern("begin"))
+      } {
+        (pattern: mutable.Map[String, String], out: Collector[String]) =>
+          //verifies input parameter forwarding
+          assertEquals(inParam, pattern.asJava)
+          out.collect("match")
+          out.collect(pattern("begin"))
+      }
+
+    val fun = extractUserFunction[
+      StreamFlatMap[
+        FEither[
+          FTuple2[JMap[String, String], JLong],
+          JMap[String, String]],
+        Either[String, String]]](result)
+
+    fun.getUserFunction.flatMap(FEither.Right(inParam), output)
+    fun.getUserFunction.flatMap(FEither.Left(FTuple2.of(inParam, 42L)), output)
+
+    assertEquals(expectedOutput, outList)
+  }
+
+  def extractUserFunction[T](dataStream: DataStream[_]) = {
+    dataStream.javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[_, _]]
+      .getOperator
+      .asInstanceOf[T]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala
deleted file mode 100644
index 7daebfe..0000000
--- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.cep.scala
-
-import org.apache.flink.api.common.functions.util.ListCollector
-import org.apache.flink.cep.scala.pattern.Pattern
-import org.apache.flink.streaming.api.operators.{StreamFlatMap, StreamMap}
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.util.{Collector, TestLogger}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import org.junit.Assert._
-import org.junit.Test
-
-class PatternStreamScalaJavaAPIInteroperabiliyTest extends TestLogger {
-
-  @Test
-  @throws[Exception]
-  def testScalaJavaAPISelectFunForwarding {
-    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-    val dummyDataStream: DataStream[(Int, Int)] = env.fromElements()
-    val pattern: Pattern[(Int, Int), _] = Pattern.begin[(Int, Int)]("dummy")
-    val pStream: PatternStream[(Int, Int)] = CEP.pattern(dummyDataStream, pattern)
-    val param = mutable.Map("begin" ->(1, 2)).asJava
-    val result: DataStream[(Int, Int)] = pStream
-      .select((pattern: mutable.Map[String, (Int, Int)]) => {
-        //verifies input parameter forwarding
-        assertEquals(param, pattern.asJava)
-        param.get("begin")
-      })
-    val out = extractUserFunction[StreamMap[java.util.Map[String, (Int, Int)], (Int, Int)]](result)
-      .getUserFunction.map(param)
-    //verifies output parameter forwarding
-    assertEquals(param.get("begin"), out)
-  }
-
-  @Test
-  @throws[Exception]
-  def testScalaJavaAPIFlatSelectFunForwarding {
-    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-    val dummyDataStream: DataStream[List[Int]] = env.fromElements()
-    val pattern: Pattern[List[Int], _] = Pattern.begin[List[Int]]("dummy")
-    val pStream: PatternStream[List[Int]] = CEP.pattern(dummyDataStream, pattern)
-    val inList = List(1, 2, 3)
-    val inParam = mutable.Map("begin" -> inList).asJava
-    val outList = new java.util.ArrayList[List[Int]]
-    val outParam = new ListCollector[List[Int]](outList)
-
-    val result: DataStream[List[Int]] = pStream
-
-      .flatSelect((pattern: mutable.Map[String, List[Int]], out: Collector[List[Int]]) => {
-        //verifies input parameter forwarding
-        assertEquals(inParam, pattern.asJava)
-        out.collect(pattern.get("begin").get)
-      })
-
-    extractUserFunction[StreamFlatMap[java.util.Map[String, List[Int]], List[Int]]](result).
-      getUserFunction.flatMap(inParam, outParam)
-    //verify output parameter forwarding and that flatMap function was actually called
-    assertEquals(inList, outList.get(0))
-  }
-
-  def extractUserFunction[T](dataStream: DataStream[_]) = {
-    dataStream.javaStream
-      .getTransformation
-      .asInstanceOf[OneInputTransformation[_, _]]
-      .getOperator
-      .asInstanceOf[T]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
index 60e0bf8..9ce9f77 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
@@ -18,20 +18,8 @@
 
 package org.apache.flink.cep;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.cep.nfa.State;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.cep.operator.CEPPatternOperator;
-import org.apache.flink.cep.operator.KeyedCEPPatternOperator;
 import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-
-import java.util.Map;
 
 /**
  * Utility class for complex event processing.
@@ -39,62 +27,15 @@ import java.util.Map;
  * Methods which transform a {@link DataStream} into a {@link PatternStream} to do CEP.
  */
 public class CEP {
-	private static final String PATTERN_OPERATOR_NAME = "AbstractCEPPatternOperator";
-
 	/**
-	 * Transforms a {@link DataStream<T>} into a {@link PatternStream<T>}. The PatternStream detects
-	 * the provided event pattern and emits the patterns as a {@link Map<String, T>} where each event
-	 * is identified by a String. The String is the name of the {@link State <T>} to which the event
-	 * has been associated.
-	 *
-	 * Depending on the input {@link DataStream<T>} type, keyed vs. non-keyed, a different
-	 * {@link org.apache.flink.cep.operator.AbstractCEPPatternOperator<T>} is instantiated.
+	 * Creates a {@link PatternStream} from an input data stream and a pattern.
 	 *
 	 * @param input DataStream containing the input events
 	 * @param pattern Pattern specification which shall be detected
 	 * @param <T> Type of the input events
-	 * @param <K> Type of the key in case of a KeyedStream (necessary to bind keySelector and
-	 *            keySerializer to the same type)
 	 * @return Resulting pattern stream
 	 */
-	public static <T, K> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern) {
-		final TypeSerializer<T> inputSerializer = input.getType().createSerializer(input.getExecutionConfig());
-
-		// check whether we use processing time
-		final boolean isProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
-
-		// compile our pattern into a NFAFactory to instantiate NFAs later on
-		final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer);
-
-		final DataStream<Map<String, T>> patternStream;
-
-		if (input instanceof KeyedStream) {
-			// We have to use the KeyedCEPPatternOperator which can deal with keyed input streams
-			KeyedStream<T, K> keyedStream= (KeyedStream<T, K>) input;
-
-			KeySelector<T, K> keySelector = keyedStream.getKeySelector();
-			TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
-
-			patternStream = keyedStream.transform(
-				PATTERN_OPERATOR_NAME,
-				(TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
-				new KeyedCEPPatternOperator<>(
-					inputSerializer,
-					isProcessingTime,
-					keySelector,
-					keySerializer,
-					nfaFactory));
-		} else {
-			patternStream = input.transform(
-				PATTERN_OPERATOR_NAME,
-				(TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
-				new CEPPatternOperator<T>(
-					inputSerializer,
-					isProcessingTime,
-					nfaFactory
-				)).setParallelism(1);
-		}
-
-		return new PatternStream<>(patternStream, input.getType());
+	public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern) {
+		return new PatternStream<>(input, pattern);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternFlatTimeoutFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternFlatTimeoutFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternFlatTimeoutFunction.java
new file mode 100644
index 0000000..661d32a
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternFlatTimeoutFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Base interface for a pattern timeout function which can produce multiple resulting elements. A
+ * pattern flat timeout function is called with a map of partial events which are identified by
+ * their names and the timestamp when the timeout occurred. The names are defined by the
+ * {@link org.apache.flink.cep.pattern.Pattern} specifying the sought-after pattern.
+ * Additionally, a collector is provided as a parameter. The collector is used to emit an arbitrary
+ * number of resulting elements.
+ *
+ * <pre>{@code
+ * PatternStream<IN> pattern = ...
+ *
+ * DataStream<OUT> result = pattern.flatSelect(..., new MyPatternFlatTimeoutFunction());
+ * }</pre>
+ * @param <IN>
+ * @param <OUT>
+ */
+public interface PatternFlatTimeoutFunction<IN, OUT> extends Function, Serializable {
+
+	/**
+	 * Generates zero or more resulting timeout elements given a map of partial pattern events and
+	 * the timestamp of the timeout. The events are identified by their specified names.
+	 *
+	 * @param pattern Map containing the partial pattern. Events are identified by their names.
+	 * @param timeoutTimestamp Timestamp when the timeout occurred
+	 * @param out Collector used to output the generated elements
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the
+	 * 					 operation to fail and may trigger recovery.
+	 */
+	void timeout(Map<String, IN> pattern, long timeoutTimestamp, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 57c5a9b..efcd16c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -21,8 +21,13 @@ package org.apache.flink.cep;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.cep.operator.CEPOperatorUtils;
+import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Either;
 import org.apache.flink.util.Collector;
 
 import java.util.Map;
@@ -33,18 +38,29 @@ import java.util.Map;
  * {@link org.apache.flink.cep.nfa.NFA}. In order to process the detected sequences, the user
  * has to specify a {@link PatternSelectFunction} or a {@link PatternFlatSelectFunction}.
  *
+ * Additionally it allows to handle partially matched event patterns which have timed out. For this
+ * the user has to specify a {@link PatternTimeoutFunction} or a {@link PatternFlatTimeoutFunction}.
+ *
  * @param <T> Type of the events
  */
 public class PatternStream<T> {
 
 	// underlying data stream
-	private final DataStream<Map<String, T>> patternStream;
-	// type information of input type T
-	private final TypeInformation<T> inputType;
+	private final DataStream<T> inputStream;
+
+	private final Pattern<T, ?> pattern;
+
+	PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
+		this.inputStream = inputStream;
+		this.pattern = pattern;
+	}
 
-	PatternStream(final DataStream<Map<String, T>> patternStream, final TypeInformation<T> inputType) {
-		this.patternStream = patternStream;
-		this.inputType = inputType;
+	public Pattern<T, ?> getPattern() {
+		return pattern;
+	}
+
+	public DataStream<T> getInputStream() {
+		return inputStream;
 	}
 
 	/**
@@ -67,7 +83,7 @@ public class PatternStream<T> {
 			PatternSelectFunction.class,
 			1,
 			-1,
-			inputType,
+			inputStream.getType(),
 			null,
 			false);
 
@@ -87,6 +103,8 @@ public class PatternStream<T> {
 	 *         function.
 	 */
 	public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
+		DataStream<Map<String, T>> patternStream = CEPOperatorUtils.createPatternStream(inputStream, pattern);
+
 		return patternStream.map(
 			new PatternSelectMapper<>(
 				patternStream.getExecutionEnvironment().clean(patternSelectFunction)))
@@ -94,13 +112,65 @@ public class PatternStream<T> {
 	}
 
 	/**
+	 * Applies a select function to the detected pattern sequence. For each pattern sequence the
+	 * provided {@link PatternSelectFunction} is called. The pattern select function can produce
+	 * exactly one resulting element.
+	 *
+	 * Applies a timeout function to a partial pattern sequence which has timed out. For each
+	 * partial pattern sequence the provided {@link PatternTimeoutFunction} is called. The pattern
+	 * timeout function can produce exactly one resulting element.
+	 *
+	 * @param patternTimeoutFunction The pattern timeout function which is called for each partial
+	 *                               pattern sequence which has timed out.
+	 * @param patternSelectFunction The pattern select function which is called for each detected
+	 *                              pattern sequence.
+	 * @param <L> Type of the resulting timeout elements
+	 * @param <R> Type of the resulting elements
+	 * @return {@link DataStream} which contains the resulting elements or the resulting timeout
+	 * elements wrapped in an {@link Either} type.
+	 */
+	public <L, R> DataStream<Either<L, R>> select(
+		final PatternTimeoutFunction<T, L> patternTimeoutFunction,
+		final PatternSelectFunction<T, R> patternSelectFunction) {
+
+		DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream = CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
+
+		TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+			patternTimeoutFunction,
+			PatternTimeoutFunction.class,
+			1,
+			-1,
+			inputStream.getType(),
+			null,
+			false);
+
+		TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+			patternSelectFunction,
+			PatternSelectFunction.class,
+			1,
+			-1,
+			inputStream.getType(),
+			null,
+			false);
+
+		TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo);
+
+		return patternStream.map(
+			new PatternSelectTimeoutMapper<>(
+				patternStream.getExecutionEnvironment().clean(patternSelectFunction),
+				patternStream.getExecutionEnvironment().clean(patternTimeoutFunction)
+			)
+		).returns(outTypeInfo);
+	}
+
+	/**
 	 * Applies a flat select function to the detected pattern sequence. For each pattern sequence
 	 * the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function
 	 * can produce an arbitrary number of resulting elements.
 	 *
 	 * @param patternFlatSelectFunction The pattern flat select function which is called for each
 	 *                                  detected pattern sequence.
-	 * @param <R> Typ of the resulting elements
+	 * @param <R> Type of the resulting elements
 	 * @return {@link DataStream} which contains the resulting elements from the pattern flat select
 	 *         function.
 	 */
@@ -112,7 +182,7 @@ public class PatternStream<T> {
 			PatternFlatSelectFunction.class,
 			1,
 			0,
-			inputType,
+			inputStream.getType(),
 			null,
 			false);
 
@@ -126,12 +196,14 @@ public class PatternStream<T> {
 	 *
 	 * @param patternFlatSelectFunction The pattern flat select function which is called for each
 	 *                                  detected pattern sequence.
-	 * @param <R> Typ of the resulting elements
+	 * @param <R> Type of the resulting elements
 	 * @param outTypeInfo Explicit specification of output type.
 	 * @return {@link DataStream} which contains the resulting elements from the pattern flat select
 	 *         function.
 	 */
 	public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
+		DataStream<Map<String, T>> patternStream = CEPOperatorUtils.createPatternStream(inputStream, pattern);
+
 		return patternStream.flatMap(
 			new PatternFlatSelectMapper<>(
 				patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
@@ -139,6 +211,59 @@ public class PatternStream<T> {
 	}
 
 	/**
+	 * Applies a flat select function to the detected pattern sequence. For each pattern sequence
+	 * the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function
+	 * can produce an arbitrary number of resulting elements.
+	 *
+	 * Applies a timeout function to a partial pattern sequence which has timed out. For each
+	 * partial pattern sequence the provided {@link PatternFlatTimeoutFunction} is called. The
+	 * pattern timeout function can produce an arbitrary number of resulting elements.
+	 *
+	 * @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each
+	 *                                   partial pattern sequence which has timed out.
+	 * @param patternFlatSelectFunction The pattern flat select function which is called for each
+	 *                                  detected pattern sequence.
+	 * @param <L> Type of the resulting timeout events
+	 * @param <R> Type of the resulting events
+	 * @return {@link DataStream} which contains the resulting events from the pattern flat select
+	 * function or the resulting timeout events from the pattern flat timeout function wrapped in an
+	 * {@link Either} type.
+	 */
+	public <L, R> DataStream<Either<L, R>> flatSelect(
+		final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
+		final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
+
+		DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream = CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
+
+		TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+			patternFlatTimeoutFunction,
+			PatternFlatTimeoutFunction.class,
+			1,
+			-1,
+			inputStream.getType(),
+			null,
+			false);
+
+		TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+			patternFlatSelectFunction,
+			PatternFlatSelectFunction.class,
+			1,
+			-1,
+			inputStream.getType(),
+			null,
+			false);
+
+		TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo);
+
+		return patternStream.flatMap(
+			new PatternFlatSelectTimeoutWrapper<>(
+				patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction),
+				patternStream.getExecutionEnvironment().clean(patternFlatTimeoutFunction)
+			)
+		).returns(outTypeInfo);
+	}
+
+	/**
 	 * Wrapper for a {@link PatternSelectFunction}.
 	 *
 	 * @param <T> Type of the input elements
@@ -159,6 +284,97 @@ public class PatternStream<T> {
 		}
 	}
 
+	private static class PatternSelectTimeoutMapper<T, L, R> implements MapFunction<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>, Either<L, R>> {
+
+		private static final long serialVersionUID = 8259477556738887724L;
+
+		private final PatternSelectFunction<T, R> patternSelectFunction;
+		private final PatternTimeoutFunction<T, L> patternTimeoutFunction;
+
+		public PatternSelectTimeoutMapper(
+			PatternSelectFunction<T, R> patternSelectFunction,
+			PatternTimeoutFunction<T, L> patternTimeoutFunction) {
+
+			this.patternSelectFunction = patternSelectFunction;
+			this.patternTimeoutFunction = patternTimeoutFunction;
+		}
+
+		@Override
+		public Either<L, R> map(Either<Tuple2<Map<String, T>, Long>, Map<String, T>> value) throws Exception {
+			if (value.isLeft()) {
+				Tuple2<Map<String, T>, Long> timeout = value.left();
+
+				return Either.Left(patternTimeoutFunction.timeout(timeout.f0, timeout.f1));
+			} else {
+				return Either.Right(patternSelectFunction.select(value.right()));
+			}
+		}
+	}
+
+	private static class PatternFlatSelectTimeoutWrapper<T, L, R> implements FlatMapFunction<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>, Either<L, R>> {
+
+		private static final long serialVersionUID = 7483674669662261667L;
+
+		private final PatternFlatSelectFunction<T, R> patternFlatSelectFunction;
+		private final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction;
+
+		public PatternFlatSelectTimeoutWrapper(
+			PatternFlatSelectFunction<T, R> patternFlatSelectFunction,
+			PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction) {
+			this.patternFlatSelectFunction = patternFlatSelectFunction;
+			this.patternFlatTimeoutFunction = patternFlatTimeoutFunction;
+		}
+
+		@Override
+		public void flatMap(Either<Tuple2<Map<String, T>, Long>, Map<String, T>> value, Collector<Either<L, R>> out) throws Exception {
+			if (value.isLeft()) {
+				Tuple2<Map<String, T>, Long> timeout = value.left();
+
+				patternFlatTimeoutFunction.timeout(timeout.f0, timeout.f1, new LeftCollector<>(out));
+			} else {
+				patternFlatSelectFunction.flatSelect(value.right(), new RightCollector(out));
+			}
+		}
+
+		private static class LeftCollector<L, R> implements Collector<L> {
+
+			private final Collector<Either<L, R>> out;
+
+			private LeftCollector(Collector<Either<L, R>> out) {
+				this.out = out;
+			}
+
+			@Override
+			public void collect(L record) {
+				out.collect(Either.<L, R>Left(record));
+			}
+
+			@Override
+			public void close() {
+				out.close();
+			}
+		}
+
+		private static class RightCollector<L, R> implements Collector<R> {
+
+			private final Collector<Either<L, R>> out;
+
+			private RightCollector(Collector<Either<L, R>> out) {
+				this.out = out;
+			}
+
+			@Override
+			public void collect(R record) {
+				out.collect(Either.<L, R>Right(record));
+			}
+
+			@Override
+			public void close() {
+				out.close();
+			}
+		}
+	}
+
 	/**
 	 * Wrapper for a {@link PatternFlatSelectFunction}.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternTimeoutFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternTimeoutFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternTimeoutFunction.java
new file mode 100644
index 0000000..974d6df
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternTimeoutFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Base interface for a pattern timeout function. A pattern timeout function is called with a
+ * map containing the timed out partial events which can be accessed by their names and the
+ * timestamp when the timeout occurred. The names depend on the definition of the
+ * {@link org.apache.flink.cep.pattern.Pattern}. The timeout method returns exactly one result. If
+ * you want to return more than one result, then you have to implement a
+ * {@link PatternFlatTimeoutFunction}.
+ *
+ * <pre>{@code
+ * PatternStream<IN> pattern = ...;
+ *
+ * DataStream<OUT> result = pattern.select(..., new MyPatternTimeoutFunction());
+ *}</pre>
+ *
+ * @param <IN> Type of the input elements
+ * @param <OUT> Type of the output element
+ */
+public interface PatternTimeoutFunction<IN, OUT> extends Function, Serializable {
+
+	/**
+	 * Generates a timeout result from the given map of events and timeout timestamp. The partial
+	 * events are identified by their names. Only one resulting element can be generated.
+	 *
+	 * @param pattern Map containing the found partial pattern. Events are identified by their names
+	 * @param timeoutTimestamp Timestamp of the timeout
+	 * @return Resulting timeout element
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the
+	 * 					 operation to fail and may trigger recovery.
+	 */
+	OUT timeout(Map<String, IN> pattern, long timeoutTimestamp) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 5824264..f769a2b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.nfa;
 
 import com.google.common.collect.LinkedHashMultimap;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -34,7 +35,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Queue;
@@ -71,15 +71,22 @@ public class NFA<T> implements Serializable {
 	// Length of the window
 	private final long windowTime;
 
+	private final boolean handleTimeout;
+
 	// Current starting index for the next dewey version number
 	private int startEventCounter;
 
 	// Current set of computation states within the state machine
 	private transient Queue<ComputationState<T>> computationStates;
 
-	public NFA(final TypeSerializer<T> eventSerializer, final long windowTime) {
+	public NFA(
+		final TypeSerializer<T> eventSerializer,
+		final long windowTime,
+		final boolean handleTimeout) {
+
 		this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer);
 		this.windowTime = windowTime;
+		this.handleTimeout = handleTimeout;
 		sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
 		computationStates = new LinkedList<>();
 
@@ -107,16 +114,19 @@ public class NFA<T> implements Serializable {
 
 	/**
 	 * Processes the next input event. If some of the computations reach a final state then the
-	 * resulting event sequences are returned.
+	 * resulting event sequences are returned. If computations time out and timeout handling is
+	 * activated, then the timed out event patterns are returned.
 	 *
 	 * @param event The current event to be processed
 	 * @param timestamp The timestamp of the current event
-	 * @return The collection of matched patterns (e.g. the result of computations which have
-	 * reached a final state)
+	 * @return Tuple of the collection of matched patterns (e.g. the result of computations which have
+	 * reached a final state) and the collection of timed out patterns (if timeout handling is
+	 * activated)
 	 */
-	public Collection<Map<String, T>> process(final T event, final long timestamp) {
+	public Tuple2<Collection<Map<String, T>>, Collection<Tuple2<Map<String, T>, Long>>> process(final T event, final long timestamp) {
 		final int numberComputationStates = computationStates.size();
-		final List<Map<String, T>> result = new ArrayList<>();
+		final Collection<Map<String, T>> result = new ArrayList<>();
+		final Collection<Tuple2<Map<String, T>, Long>> timeoutResult = new ArrayList<>();
 
 		// iterate over all current computations
 		for (int i = 0; i < numberComputationStates; i++) {
@@ -127,6 +137,16 @@ public class NFA<T> implements Serializable {
 			if (!computationState.isStartState() &&
 				windowTime > 0 &&
 				timestamp - computationState.getStartTimestamp() >= windowTime) {
+
+				if (handleTimeout) {
+					// extract the timed out event patterns
+					Collection<Map<String, T>> timeoutPatterns = extractPatternMatches(computationState);
+
+					for (Map<String, T> timeoutPattern : timeoutPatterns) {
+						timeoutResult.add(Tuple2.of(timeoutPattern, timestamp));
+					}
+				}
+
 				// remove computation state which has exceeded the window length
 				sharedBuffer.release(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
 				sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
@@ -150,24 +170,24 @@ public class NFA<T> implements Serializable {
 					computationStates.add(newComputationState);
 				}
 			}
+		}
 
-			// prune shared buffer based on window length
-			if(windowTime > 0) {
-				long pruningTimestamp = timestamp - windowTime;
-
-				// sanity check to guard against underflows
-				if (pruningTimestamp >= timestamp) {
-					throw new IllegalStateException("Detected an underflow in the pruning timestamp. This indicates that" +
-						" either the window length is too long (" + windowTime + ") or that the timestamp has not been" +
-						" set correctly (e.g. Long.MIN_VALUE).");
-				}
+		// prune shared buffer based on window length
+		if(windowTime > 0) {
+			long pruningTimestamp = timestamp - windowTime;
 
-				// remove all elements which are expired with respect to the window length
-				sharedBuffer.prune(pruningTimestamp);
+			// sanity check to guard against underflows
+			if (pruningTimestamp >= timestamp) {
+				throw new IllegalStateException("Detected an underflow in the pruning timestamp. This indicates that" +
+					" either the window length is too long (" + windowTime + ") or that the timestamp has not been" +
+					" set correctly (e.g. Long.MIN_VALUE).");
 			}
+
+			// remove all elements which are expired with respect to the window length
+			sharedBuffer.prune(pruningTimestamp);
 		}
 
-		return result;
+		return Tuple2.of(result, timeoutResult);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index f2561d4..878e0b2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -48,11 +48,15 @@ public class NFACompiler {
 	 *
 	 * @param pattern Definition of sequence pattern
 	 * @param inputTypeSerializer Serializer for the input type
+	 * @param timeoutHandling True if the NFA shall return timed out event patterns
 	 * @param <T> Type of the input events
 	 * @return Non-deterministic finite automaton representing the given pattern
 	 */
-	public static <T> NFA<T> compile(Pattern<T, ?> pattern, TypeSerializer<T> inputTypeSerializer) {
-		NFAFactory<T> factory = compileFactory(pattern, inputTypeSerializer);
+	public static <T> NFA<T> compile(
+		Pattern<T, ?> pattern,
+		TypeSerializer<T> inputTypeSerializer,
+		boolean timeoutHandling) {
+		NFAFactory<T> factory = compileFactory(pattern, inputTypeSerializer, timeoutHandling);
 
 		return factory.createNFA();
 	}
@@ -63,14 +67,18 @@ public class NFACompiler {
 	 *
 	 * @param pattern Definition of sequence pattern
 	 * @param inputTypeSerializer Serializer for the input type
+	 * @param timeoutHandling True if the NFA shall return timed out event patterns
 	 * @param <T> Type of the input events
 	 * @return Factory for NFAs corresponding to the given pattern
 	 */
 	@SuppressWarnings("unchecked")
-	public static <T> NFAFactory<T> compileFactory(Pattern<T, ?> pattern, TypeSerializer<T> inputTypeSerializer) {
+	public static <T> NFAFactory<T> compileFactory(
+		Pattern<T, ?> pattern,
+		TypeSerializer<T> inputTypeSerializer,
+		boolean timeoutHandling) {
 		if (pattern == null) {
 			// return a factory for empty NFAs
-			return new NFAFactoryImpl<T>(inputTypeSerializer, 0, Collections.<State<T>>emptyList());
+			return new NFAFactoryImpl<T>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling);
 		} else {
 			// set of all generated states
 			Map<String, State<T>> states = new HashMap<>();
@@ -137,10 +145,7 @@ public class NFACompiler {
 				(FilterFunction<T>) currentPattern.getFilterFunction()
 			));
 
-			NFA<T> nfa = new NFA<T>(inputTypeSerializer, windowTime);
-			nfa.addStates(states.values());
-
-			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()));
+			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
 		}
 	}
 
@@ -168,16 +173,23 @@ public class NFACompiler {
 		private final TypeSerializer<T> inputTypeSerializer;
 		private final long windowTime;
 		private final Collection<State<T>> states;
+		private final boolean timeoutHandling;
+
+		private NFAFactoryImpl(
+			TypeSerializer<T> inputTypeSerializer,
+			long windowTime,
+			Collection<State<T>> states,
+			boolean timeoutHandling) {
 
-		private NFAFactoryImpl(TypeSerializer<T> inputTypeSerializer, long windowTime, Collection<State<T>> states) {
 			this.inputTypeSerializer = inputTypeSerializer;
 			this.windowTime = windowTime;
 			this.states = states;
+			this.timeoutHandling = timeoutHandling;
 		}
 
 		@Override
 		public NFA<T> createNFA() {
-			NFA<T> result =  new NFA<>(inputTypeSerializer.duplicate(), windowTime);
+			NFA<T> result =  new NFA<>(inputTypeSerializer.duplicate(), windowTime, timeoutHandling);
 
 			result.addStates(states);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
new file mode 100644
index 0000000..44649ac
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+/**
+ * Base class for CEP pattern operator. The operator uses a {@link NFA} to detect complex event
+ * patterns. The detected event patterns are then outputted to the down stream operators.
+ *
+ * @param <IN> Type of the input elements
+ * @param <OUT> Type fo the output elements
+ */
+public abstract class AbstractCEPBasePatternOperator<IN, OUT>
+	extends AbstractStreamOperator<OUT>
+	implements OneInputStreamOperator<IN, OUT> {
+
+	private static final long serialVersionUID = -4166778210774160757L;
+
+	protected static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+
+	private final TypeSerializer<IN> inputSerializer;
+	private final boolean isProcessingTime;
+
+	public AbstractCEPBasePatternOperator(
+			final TypeSerializer<IN> inputSerializer,
+			final boolean isProcessingTime) {
+		this.inputSerializer = inputSerializer;
+		this.isProcessingTime = isProcessingTime;
+	}
+
+	public TypeSerializer<IN> getInputSerializer() {
+		return inputSerializer;
+	}
+
+	protected abstract NFA<IN> getNFA() throws IOException;
+
+	protected abstract PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException;
+
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		if (isProcessingTime) {
+			// there can be no out of order elements in processing time
+			NFA<IN> nfa = getNFA();
+			processEvent(nfa, element.getValue(), System.currentTimeMillis());
+		} else {
+			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+
+			// event time processing
+			// we have to buffer the elements until we receive the proper watermark
+			if (getExecutionConfig().isObjectReuseEnabled()) {
+				// copy the StreamRecord so that it cannot be changed
+				priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
+			} else {
+				priorityQueue.offer(element);
+			}
+		}
+	}
+
+	/**
+	 * Process the given event by giving it to the NFA and outputting the produced set of matched
+	 * event sequences.
+	 *
+	 * @param nfa NFA to be used for the event detection
+	 * @param event The current event to be processed
+	 * @param timestamp The timestamp of the event
+	 */
+	protected abstract void processEvent(NFA<IN> nfa, IN event, long timestamp);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index 6b087e3..753656f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -20,89 +20,123 @@ package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
 import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.PriorityQueue;
 
 /**
- * Base class for CEP pattern operator. The operator uses a {@link NFA} to detect complex event
- * patterns. The detected event patterns are then outputted to the down stream operators.
+ * Abstract CEP pattern operator which is used for non keyed streams. Consequently,
+ * the operator state only includes a single {@link NFA} and a priority queue to order out of order
+ * elements in case of event time processing.
  *
  * @param <IN> Type of the input elements
+ * @param <OUT> Type of the output elements
  */
-public abstract class AbstractCEPPatternOperator<IN>
-	extends AbstractStreamOperator<Map<String, IN>>
-	implements OneInputStreamOperator<IN, Map<String, IN>> {
+abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBasePatternOperator<IN, OUT> {
+	private static final long serialVersionUID = 7487334510746595640L;
 
-	private static final long serialVersionUID = -4166778210774160757L;
+	private final StreamRecordSerializer<IN> streamRecordSerializer;
 
-	protected static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+	// global nfa for all elements
+	private NFA<IN> nfa;
 
-	private final TypeSerializer<IN> inputSerializer;
-	private final boolean isProcessingTime;
+	// queue to buffer out of order stream records
+	private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
 
 	public AbstractCEPPatternOperator(
-			final TypeSerializer<IN> inputSerializer,
-			final boolean isProcessingTime) {
-		this.inputSerializer = inputSerializer;
-		this.isProcessingTime = isProcessingTime;
+			TypeSerializer<IN> inputSerializer,
+			boolean isProcessingTime,
+			NFACompiler.NFAFactory<IN> nfaFactory) {
+		super(inputSerializer, isProcessingTime);
+
+		this.streamRecordSerializer = new StreamRecordSerializer<>(inputSerializer);
+		this.nfa = nfaFactory.createNFA();
 	}
 
-	public TypeSerializer<IN> getInputSerializer() {
-		return inputSerializer;
+	@Override
+	public void open() {
+		if (priorityQueue == null) {
+			priorityQueue = new PriorityQueue<StreamRecord<IN>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<IN>());
+		}
 	}
 
-	protected abstract NFA<IN> getNFA() throws IOException;
+	@Override
+	protected NFA<IN> getNFA() throws IOException {
+		return nfa;
+	}
 
-	protected abstract PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException;
+	@Override
+	protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
+		return priorityQueue;
+	}
 
 	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		if (isProcessingTime) {
-			// there can be no out of order elements in processing time
-			NFA<IN> nfa = getNFA();
-			processEvent(nfa, element.getValue(), System.currentTimeMillis());
-		} else {
-			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
-
-			// event time processing
-			// we have to buffer the elements until we receive the proper watermark
-			if (getExecutionConfig().isObjectReuseEnabled()) {
-				// copy the StreamRecord so that it cannot be changed
-				priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
-			} else {
-				priorityQueue.offer(element);
-			}
+	public void processWatermark(Watermark mark) throws Exception {
+		while(!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
+			StreamRecord<IN> streamRecord = priorityQueue.poll();
+
+			processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
 		}
+
+		output.emitWatermark(mark);
 	}
 
-	/**
-	 * Process the given event by giving it to the NFA and outputting the produced set of matched
-	 * event sequences.
-	 *
-	 * @param nfa NFA to be used for the event detection
-	 * @param event The current event to be processed
-	 * @param timestamp The timestamp of the event
-	 */
-	protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
-		Collection<Map<String, IN>> patterns = nfa.process(
-			event,
+	@Override
+	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+		final AbstractStateBackend.CheckpointStateOutputStream os = this.getStateBackend().createCheckpointStateOutputStream(
+			checkpointId,
 			timestamp);
 
-		if (!patterns.isEmpty()) {
-			StreamRecord<Map<String, IN>> streamRecord = new StreamRecord<Map<String, IN>>(
-				null,
-				timestamp);
+		final ObjectOutputStream oos = new ObjectOutputStream(os);
+		final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os);
+
+		oos.writeObject(nfa);
+
+		ov.writeInt(priorityQueue.size());
 
-			for (Map<String, IN> pattern : patterns) {
-				streamRecord.replace(pattern);
-				output.collect(streamRecord);
-			}
+		for (StreamRecord<IN> streamRecord: priorityQueue) {
+			streamRecordSerializer.serialize(streamRecord, ov);
 		}
+
+		taskState.setOperatorState(os.closeAndGetHandle());
+
+		return taskState;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
+		super.restoreState(state, recoveryTimestamp);
+
+		StreamStateHandle stream = (StreamStateHandle)state.getOperatorState();
+
+		final InputStream is = stream.getState(getUserCodeClassloader());
+		final ObjectInputStream ois = new ObjectInputStream(is);
+		final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(is);
+
+		nfa = (NFA<IN>)ois.readObject();
+
+		int numberPriorityQueueEntries = div.readInt();
+
+		priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>());
+
+		for (int i = 0; i <numberPriorityQueueEntries; i++) {
+			priorityQueue.offer(streamRecordSerializer.deserialize(div));
+		}
+
+		div.close();
 	}
 }


Mime
View raw message