flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-3708] [cep] Add Scala CEP API
Date Tue, 26 Apr 2016 16:48:39 GMT
Repository: flink
Updated Branches:
  refs/heads/master 2bd6212a3 -> e29ac036a


[FLINK-3708] [cep] Add Scala CEP API

Added missing test dependency to pom.

Pattern in Scala API now uses Option to shield users against null values from the Java API

Added test-jar for build phase to pom.

Removed necessary dependency entry and an overlooked comment.

Added missing comments on FollowedByPattern.

CEPIT test update.

Replace CEP test with a simple test for Scala <-> Java interoperability.

This closes #1905.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e29ac036
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e29ac036
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e29ac036

Branch: refs/heads/master
Commit: e29ac036a76cc78ea608ffe1f0784ec15e351c60
Parents: 2bd6212
Author: Stefan Richter <stefanrichter83@gmail.com>
Authored: Mon Apr 18 13:10:09 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Apr 26 18:47:39 2016 +0200

----------------------------------------------------------------------
 flink-libraries/flink-cep-scala/pom.xml         | 114 +++++++++
 .../scala/org/apache/flink/cep/scala/CEP.scala  |  44 ++++
 .../apache/flink/cep/scala/PatternStream.scala  | 129 ++++++++++
 .../org/apache/flink/cep/scala/package.scala    |  46 ++++
 .../cep/scala/pattern/FollowedByPattern.scala   |  44 ++++
 .../flink/cep/scala/pattern/Pattern.scala       | 176 +++++++++++++
 .../flink/cep/scala/pattern/package.scala       |  39 +++
 .../cep/scala/CEPScalaAPICompletenessTest.scala |  46 ++++
 ...nStreamScalaJavaAPIInteroperabiliyTest.scala |  87 +++++++
 .../PatternScalaAPICompletenessTest.scala       |  44 ++++
 .../flink/cep/scala/pattern/PatternTest.scala   | 248 +++++++++++++++++++
 flink-libraries/flink-cep/pom.xml               | 120 +++++----
 .../org/apache/flink/cep/PatternStream.java     |  39 ++-
 .../flink/cep/pattern/AndFilterFunction.java    |   8 +
 .../org/apache/flink/cep/pattern/Pattern.java   |   6 +-
 flink-libraries/pom.xml                         |   1 +
 flink-streaming-scala/pom.xml                   |  13 +
 .../ScalaAPICompletenessTestBase.scala          |   4 +
 18 files changed, 1150 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/pom.xml b/flink-libraries/flink-cep-scala/pom.xml
new file mode 100644
index 0000000..1b0caee
--- /dev/null
+++ b/flink-libraries/flink-cep-scala/pom.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-libraries</artifactId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+    <artifactId>flink-cep-scala_2.10</artifactId>
+    <name>flink-cep-scala</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-cep_2.10</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_2.10</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <!-- We need to add this explicitly because through shading the dependency on asm seems
+        to go away. TODO -->
+        <dependency>
+            <groupId>org.ow2.asm</groupId>
+            <artifactId>asm</artifactId>
+            <version>${asm.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-tests_2.10</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_2.10</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.10</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_2.10</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-cep_2.10</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- Scala Compiler -->
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>3.2.2</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+			<!-- Scala Code Style, most of the configuration done via plugin management -->
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<configuration>
+					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+				</configuration>
+			</plugin>
+        </plugins>
+    </build>
+    
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala
new file mode 100644
index 0000000..f28aee2
--- /dev/null
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.cep.{CEP => JCEP}
+import org.apache.flink.streaming.api.scala.DataStream
+
+/**
+  * Utility method to transform a [[DataStream]] into a [[PatternStream]] to do CEP.
+  */
+
+object CEP {
+  /**
+    * Transforms a [[DataStream]] into a [[PatternStream]] in the Scala API.
+    * See [[org.apache.flink.cep.CEP}]]for a more detailed description how the underlying
+    * Java API works.
+    *
+    * @param input   DataStream containing the input events
+    * @param pattern Pattern specification which shall be detected
+    * @tparam T Type of the input events
+    * @return Resulting pattern stream
+    */
+  def pattern[T](input: DataStream[T],
+                                  pattern: Pattern[T, _]): PatternStream[T] = {
+    wrapPatternStream(JCEP.pattern(input.javaStream, pattern.wrappedPattern))
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
new file mode 100644
index 0000000..22b105c
--- /dev/null
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import java.util.{Map => JMap}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction,
+PatternStream => JPatternStream}
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.asScalaStream
+import org.apache.flink.util.Collector
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected
+  * pattern sequences as a map of events associated with their names. The pattern is detected using
+  * a [[org.apache.flink.cep.nfa.NFA]]. In order to process the detected sequences, the user has to
+  * specify a [[PatternSelectFunction]] or a [[PatternFlatSelectFunction]].
+  *
+  * @param jPatternStream Underlying pattern stream from Java API
+  * @tparam T Type of the events
+  */
+class PatternStream[T](jPatternStream: JPatternStream[T]) {
+
+  private[flink] def wrappedPatternStream = jPatternStream
+
+  /**
+    * Applies a select function to the detected pattern sequence. For each pattern sequence the
+    * provided [[PatternSelectFunction]] is called. The pattern select function can produce
+    * exactly one resulting element.
+    *
+    * @param patternSelectFunction The pattern select function which is called for each detected
+    *                              pattern sequence.
+    * @tparam R Type of the resulting elements
+    * @return [[DataStream]] which contains the resulting elements from the pattern select function.
+    */
+  def select[R: TypeInformation](patternSelectFunction: PatternSelectFunction[T, R])
+  : DataStream[R] = {
+    asScalaStream(jPatternStream.select(patternSelectFunction, implicitly[TypeInformation[R]]))
+  }
+
+  /**
+    * Applies a flat select function to the detected pattern sequence. For each pattern sequence
+    * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can
+    * produce an arbitrary number of resulting elements.
+    *
+    * @param patternFlatSelectFunction The pattern flat select function which is called for each
+    *                                  detected pattern sequence.
+    * @tparam R Type of the resulting elements
+    * @return [[DataStream]] which contains the resulting elements from the pattern flat select
+    *         function.
+    */
+  def flatSelect[R: TypeInformation](patternFlatSelectFunction: PatternFlatSelectFunction[T, R])
+  : DataStream[R] = {
+    asScalaStream(jPatternStream
+      .flatSelect(patternFlatSelectFunction, implicitly[TypeInformation[R]]))
+  }
+
+  /**
+    * Applies a select function to the detected pattern sequence. For each pattern sequence the
+    * provided [[PatternSelectFunction]] is called. The pattern select function can produce exactly
+    * one resulting element.
+    *
+    * @param patternSelectFun The pattern select function which is called for each detected
+    *                         pattern sequence.
+    * @tparam R Type of the resulting elements
+    * @return [[DataStream]] which contains the resulting elements from the pattern select function.
+    */
+  def select[R: TypeInformation](patternSelectFun: mutable.Map[String, T] => R): DataStream[R] = {
+    val patternSelectFunction: PatternSelectFunction[T, R] = new PatternSelectFunction[T, R] {
+      val cleanFun = cleanClosure(patternSelectFun)
+
+      def select(in: JMap[String, T]): R = cleanFun(in.asScala)
+    }
+    select(patternSelectFunction)
+  }
+
+  /**
+    * Applies a flat select function to the detected pattern sequence. For each pattern sequence
+    * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function
+    * can produce an arbitrary number of resulting elements.
+    *
+    * @param patternFlatSelectFun The pattern flat select function which is called for each
+    *                             detected pattern sequence.
+    * @tparam R Type of the resulting elements
+    * @return [[DataStream]] which contains the resulting elements from the pattern flat select
+    *         function.
+    */
+  def flatSelect[R: TypeInformation](patternFlatSelectFun: (mutable.Map[String, T],
+    Collector[R]) => Unit): DataStream[R] = {
+    val patternFlatSelectFunction: PatternFlatSelectFunction[T, R] =
+      new PatternFlatSelectFunction[T, R] {
+        val cleanFun = cleanClosure(patternFlatSelectFun)
+
+        def flatSelect(pattern: JMap[String, T], out: Collector[R]): Unit =
+          cleanFun(pattern.asScala, out)
+      }
+    flatSelect(patternFlatSelectFunction)
+  }
+
+}
+
+object PatternStream {
+  /**
+    *
+    * @param jPatternStream Underlying pattern stream from Java API
+    * @tparam T Type of the events
+    * @return A new pattern stream wrapping the pattern stream from Java APU
+    */
+  def apply[T](jPatternStream: JPatternStream[T]) = {
+    new PatternStream[T](jPatternStream)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/package.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/package.scala
new file mode 100644
index 0000000..179174b
--- /dev/null
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/package.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep
+
+import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.cep.{PatternStream => JPatternStream}
+
+package object scala {
+
+  /**
+    * Utility method to wrap [[org.apache.flink.cep.PatternStream]] for usage with the Scala API.
+    *
+    * @param javaPatternStream The underlying pattern stream from the Java API
+    * @tparam T Type of the events
+    * @return A pattern stream from the Scala API which wraps a pattern stream from the Java API
+    */
+  private[flink] def wrapPatternStream[T](javaPatternStream: JPatternStream[T])
+  : scala.PatternStream[T] = {
+    Option(javaPatternStream) match {
+      case Some(p) => PatternStream[T](p)
+      case None =>
+        throw new IllegalArgumentException("PatternStream from Java API must not be null.")
+    }
+  }
+
+  private[flink] def cleanClosure[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
+    ClosureCleaner.clean(f, checkSerializable)
+    return f
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala
new file mode 100644
index 0000000..4bda08f
--- /dev/null
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern}
+
+object FollowedByPattern {
+  /**
+    * Constructs a new Pattern by wrapping a given Java API Pattern
+    *
+    * @param jfbPattern Underlying Java API Pattern.
+    * @tparam T Base type of the elements appearing in the pattern
+    * @tparam F Subtype of T to which the current pattern operator is constrained
+    * @return New wrapping FollowedByPattern object
+    */
+  def apply[T, F <: T](jfbPattern: JFollowedByPattern[T, F]) =
+    new FollowedByPattern[T, F](jfbPattern)
+}
+
+/**
+  * Pattern operator which signifies that the there is a non-strict temporal contiguity between
+  * itself and its preceding pattern operator. This means that there might be events in between
+  * two matching events. These events are then simply ignored.
+  *
+  * @tparam T Base type of the events
+  * @tparam F Subtype of T to which the operator is currently constrained
+  */
+class FollowedByPattern[T, F <: T](jfbPattern: JFollowedByPattern[T, F])
+  extends Pattern[T, F](jfbPattern)

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
new file mode 100644
index 0000000..592599c
--- /dev/null
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+/**
+  * Base class for a pattern definition.
+  *
+  * A pattern definition is used by [[org.apache.flink.cep.nfa.compiler.NFACompiler]] to create
+  * a [[org.apache.flink.cep.nfa.NFA]].
+  *
+  * {{{
+  * Pattern<T, F> pattern = Pattern.<T>begin("start")
+  * .next("middle").subtype(F.class)
+  * .followedBy("end").where(new MyFilterFunction());
+  * }
+  * }}}
+  *
+  * @param jPattern Underlying Java API Pattern
+  * @tparam T Base type of the elements appearing in the pattern
+  * @tparam F Subtype of T to which the current pattern operator is constrained
+  */
+class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
+
+  private[flink] def wrappedPattern = jPattern
+
+  /**
+    *
+    * @return Name of the pattern operator
+    */
+  def getName(): String = jPattern.getName()
+
+  /**
+    *
+    * @return Window length in which the pattern match has to occur
+    */
+  def getWindowTime(): Option[Time] = {
+    Option(jPattern.getWindowTime())
+  }
+
+  /**
+    *
+    * @return Filter condition for an event to be matched
+    */
+  def getFilterFunction(): Option[FilterFunction[F]] = {
+    Option(jPattern.getFilterFunction())
+  }
+
+  /**
+    * Applies a subtype constraint on the current pattern operator. This means that an event has
+    * to be of the given subtype in order to be matched.
+    *
+    * @param clazz Class of the subtype
+    * @tparam S Type of the subtype
+    * @return The same pattern operator with the new subtype constraint
+    */
+  def subtype[S <: F](clazz: Class[S]): Pattern[T, S] = {
+    jPattern.subtype(clazz)
+    this.asInstanceOf[Pattern[T, S]]
+  }
+
+  /**
+    * Defines the maximum time interval for a matching pattern. This means that the time gap
+    * between first and the last event must not be longer than the window time.
+    *
+    * @param windowTime Time of the matching window
+    * @return The same pattern operator with the new window length
+    */
+  def within(windowTime: Time): Pattern[T, F] = {
+    jPattern.within(windowTime)
+    this
+  }
+
+  /**
+    * Appends a new pattern operator to the existing one. The new pattern operator enforces strict
+    * temporal contiguity. This means that the whole pattern only matches if an event which matches
+    * this operator directly follows the preceding matching event. Thus, there cannot be any
+    * events in between two matching events.
+    *
+    * @param name Name of the new pattern operator
+    * @return A new pattern operator which is appended to this pattern operator
+    */
+  def next(name: String): Pattern[T, T] = {
+    Pattern[T, T](jPattern.next(name))
+  }
+
+  /**
+    * Appends a new pattern operator to the existing one. The new pattern operator enforces
+    * non-strict temporal contiguity. This means that a matching event of this operator and the
+    * preceding matching event might be interleaved with other events which are ignored.
+    *
+    * @param name Name of the new pattern operator
+    * @return A new pattern operator which is appended to this pattern operator
+    */
+  def followedBy(name: String): FollowedByPattern[T, T] = {
+    FollowedByPattern(jPattern.followedBy(name))
+  }
+
+  /**
+    * Specifies a filter condition which has to be fulfilled by an event in order to be matched.
+    *
+    * @param filter Filter condition
+    * @return The same pattern operator where the new filter condition is set
+    */
+  def where(filter: FilterFunction[F]): Pattern[T, F] = {
+    jPattern.where(filter)
+    this
+  }
+
+  /**
+    * Specifies a filter condition which has to be fulfilled by an event in order to be matched.
+    *
+    * @param filterFun Filter condition
+    * @return The same pattern operator where the new filter condition is set
+    */
+  def where(filterFun: F => Boolean): Pattern[T, F] = {
+    val filter = new FilterFunction[F] {
+      val cleanFilter = cep.scala.cleanClosure(filterFun)
+
+      override def filter(value: F): Boolean = cleanFilter(value)
+    }
+    where(filter)
+  }
+
+  /**
+    *
+    * @return The previous pattern operator
+    */
+  def getPrevious(): Option[Pattern[T, _ <: T]] = {
+    wrapPattern(jPattern.getPrevious())
+  }
+
+}
+
+object Pattern {
+
+  /**
+    * Constructs a new Pattern by wrapping a given Java API Pattern
+    *
+    * @param jPattern Underlying Java API Pattern.
+    * @tparam T Base type of the elements appearing in the pattern
+    * @tparam F Subtype of T to which the current pattern operator is constrained
+    * @return New wrapping Pattern object
+    */
+  def apply[T, F <: T](jPattern: JPattern[T, F]) = new Pattern[T, F](jPattern)
+
+  /**
+    * Starts a new pattern with the initial pattern operator whose name is provided. Furthermore,
+    * the base type of the event sequence is set.
+    *
+    * @param name Name of the new pattern operator
+    * @tparam X Base type of the event pattern
+    * @return The first pattern operator of a pattern
+    */
+  def begin[X](name: String): Pattern[X, X] = Pattern(JPattern.begin(name))
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
new file mode 100644
index 0000000..382c160
--- /dev/null
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern}
+
+package object pattern {
+  /**
+    * Utility method to wrap [[org.apache.flink.cep.pattern.Pattern]] and its subclasses
+    * for usage with the Scala API.
+    *
+    * @param javaPattern The underlying pattern from the Java API
+    * @tparam T Base type of the elements appearing in the pattern
+    * @tparam F Subtype of T to which the current pattern operator is constrained
+    * @return A pattern from the Scala API which wraps the pattern from the Java API
+    */
+  private[flink] def wrapPattern[T, F <: T](javaPattern: JPattern[T, F])
+  : Option[Pattern[T, F]] = javaPattern match {
+    case f: JFollowedByPattern[T, F] => Some(FollowedByPattern[T, F](f))
+    case p: JPattern[T, F] => Some(Pattern[T, F](p))
+    case _ => None
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaAPICompletenessTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaAPICompletenessTest.scala
new file mode 100644
index 0000000..560d99a
--- /dev/null
+++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaAPICompletenessTest.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import java.lang.reflect.Method
+
+import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
+import org.apache.flink.cep.{PatternStream => JPatternStream}
+import org.junit.Test
+
+import scala.language.existentials
+
+/**
+ * This checks whether the CEP Scala API is up to feature parity with the Java API.
+ * Implements the [[ScalaAPICompletenessTestBase]] for CEP.
+ */
+class CEPScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
+
+  override def isExcludedByName(method: Method): Boolean = {
+    val name = method.getDeclaringClass().getName() + "." + method.getName()
+    val excludedNames = Seq()
+    excludedNames.contains(name)
+  }
+
+  @Test
+  override def testCompleteness(): Unit = {
+    checkMethods("PatternStream", "PatternStream",
+      classOf[JPatternStream[_]],
+      classOf[PatternStream[_]])
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala
new file mode 100644
index 0000000..7daebfe
--- /dev/null
+++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import org.apache.flink.api.common.functions.util.ListCollector
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.streaming.api.operators.{StreamFlatMap, StreamMap}
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.util.{Collector, TestLogger}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import org.junit.Assert._
+import org.junit.Test
+
+class PatternStreamScalaJavaAPIInteroperabiliyTest extends TestLogger {
+
+  @Test
+  @throws[Exception]
+  def testScalaJavaAPISelectFunForwarding {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+    val dummyDataStream: DataStream[(Int, Int)] = env.fromElements()
+    val pattern: Pattern[(Int, Int), _] = Pattern.begin[(Int, Int)]("dummy")
+    val pStream: PatternStream[(Int, Int)] = CEP.pattern(dummyDataStream, pattern)
+    val param = mutable.Map("begin" ->(1, 2)).asJava
+    val result: DataStream[(Int, Int)] = pStream
+      .select((pattern: mutable.Map[String, (Int, Int)]) => {
+        //verifies input parameter forwarding
+        assertEquals(param, pattern.asJava)
+        param.get("begin")
+      })
+    val out = extractUserFunction[StreamMap[java.util.Map[String, (Int, Int)], (Int, Int)]](result)
+      .getUserFunction.map(param)
+    //verifies output parameter forwarding
+    assertEquals(param.get("begin"), out)
+  }
+
+  @Test
+  @throws[Exception]
+  def testScalaJavaAPIFlatSelectFunForwarding {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+    val dummyDataStream: DataStream[List[Int]] = env.fromElements()
+    val pattern: Pattern[List[Int], _] = Pattern.begin[List[Int]]("dummy")
+    val pStream: PatternStream[List[Int]] = CEP.pattern(dummyDataStream, pattern)
+    val inList = List(1, 2, 3)
+    val inParam = mutable.Map("begin" -> inList).asJava
+    val outList = new java.util.ArrayList[List[Int]]
+    val outParam = new ListCollector[List[Int]](outList)
+
+    val result: DataStream[List[Int]] = pStream
+
+      .flatSelect((pattern: mutable.Map[String, List[Int]], out: Collector[List[Int]]) => {
+        //verifies input parameter forwarding
+        assertEquals(inParam, pattern.asJava)
+        out.collect(pattern.get("begin").get)
+      })
+
+    extractUserFunction[StreamFlatMap[java.util.Map[String, List[Int]], List[Int]]](result).
+      getUserFunction.flatMap(inParam, outParam)
+    //verify output parameter forwarding and that flatMap function was actually called
+    assertEquals(inList, outList.get(0))
+  }
+
+  def extractUserFunction[T](dataStream: DataStream[_]) = {
+    dataStream.javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[_, _]]
+      .getOperator
+      .asInstanceOf[T]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternScalaAPICompletenessTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternScalaAPICompletenessTest.scala
new file mode 100644
index 0000000..ac72bdf
--- /dev/null
+++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternScalaAPICompletenessTest.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import java.lang.reflect.Method
+
+import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.junit.Test
+
+import scala.language.existentials
+
+/**
+ * This checks whether the CEP Scala API is up to feature parity with the Java API.
+ * Implements the [[ScalaAPICompletenessTestBase]] for CEP.
+ */
+class PatternScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
+
+  override def isExcludedByName(method: Method): Boolean = {
+    val name = method.getDeclaringClass.getName + "." + method.getName
+    val excludedNames = Seq()
+    excludedNames.contains(name)
+  }
+
+  @Test
+  override def testCompleteness(): Unit = {
+    checkMethods("Pattern", "Pattern", classOf[JPattern[_, _]], classOf[Pattern[_, _]])
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
new file mode 100644
index 0000000..5f49031
--- /dev/null
+++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.cep.pattern.{AndFilterFunction, SubtypeFilterFunction, Pattern => JPattern}
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.cep.Event
+import org.apache.flink.cep.SubEvent
+
+class PatternTest {
+
+  /**
+    * These tests simply check that the pattern construction completes without failure and that the
+    * Scala API pattern is synchronous with its wrapped Java API pattern.
+    */
+
+  @Test
+  def testStrictContiguity: Unit = {
+    val pattern = Pattern.begin[Event]("start").next("next").next("end")
+    val jPattern = JPattern.begin[Event]("start").next("next").next("end")
+
+
+    assertTrue(checkCongruentRepresentations(pattern, jPattern))
+    assertTrue(checkCongruentRepresentations(wrapPattern(jPattern).get, jPattern))
+
+    assertTrue(checkCongruentRepresentations(pattern, pattern.wrappedPattern))
+    val previous = pattern.getPrevious.orNull
+    val preprevious = previous.getPrevious.orNull
+
+    assertTrue(pattern.getPrevious.isDefined)
+    assertTrue(previous.getPrevious.isDefined)
+    assertFalse(preprevious.getPrevious.isDefined)
+
+    assertEquals(pattern.getName, "end")
+    assertEquals(previous.getName, "next")
+    assertEquals(preprevious.getName, "start")
+  }
+
+
+  @Test
+  def testNonStrictContiguity: Unit = {
+    val pattern = Pattern.begin[Event]("start").followedBy("next").followedBy("end")
+    val jPattern = JPattern.begin[Event]("start").followedBy("next").followedBy("end")
+
+    assertTrue(checkCongruentRepresentations(pattern, jPattern))
+    assertTrue(checkCongruentRepresentations(wrapPattern(jPattern).get, jPattern))
+    val previous = pattern.getPrevious.orNull
+    val preprevious = previous.getPrevious.orNull
+
+    assertTrue(pattern.getPrevious.isDefined)
+    assertTrue(previous.getPrevious.isDefined)
+    assertFalse(preprevious.getPrevious.isDefined)
+
+    assertTrue(pattern.isInstanceOf[FollowedByPattern[_, _]])
+    assertTrue(previous.isInstanceOf[FollowedByPattern[_, _]])
+
+    assertEquals(pattern.getName, "end")
+    assertEquals(previous.getName, "next")
+    assertEquals(preprevious.getName, "start")
+  }
+
+  @Test
+  def testStrictContiguityWithCondition: Unit = {
+    val pattern = Pattern.begin[Event]("start")
+      .next("next")
+      .where((value: Event) => value.getName() == "foobar")
+      .next("end")
+      .where((value: Event) => value.getId() == 42)
+
+    val jPattern = JPattern.begin[Event]("start")
+      .next("next")
+      .where(new FilterFunction[Event]() {
+        @throws[Exception]
+        def filter(value: Event): Boolean = {
+          return value.getName() == "foobar"
+        }
+      }).next("end")
+      .where(new FilterFunction[Event]() {
+        @throws[Exception]
+        def filter(value: Event): Boolean = {
+          return value.getId() == 42
+        }
+      })
+
+    assertTrue(checkCongruentRepresentations(pattern, jPattern))
+    assertTrue(checkCongruentRepresentations(wrapPattern(jPattern).get, jPattern))
+
+    val previous = pattern.getPrevious.orNull
+    val preprevious = previous.getPrevious.orNull
+
+    assertTrue(pattern.getPrevious.isDefined)
+    assertTrue(previous.getPrevious.isDefined)
+    assertFalse(preprevious.getPrevious.isDefined)
+
+    assertTrue(pattern.getFilterFunction.isDefined)
+    assertTrue(previous.getFilterFunction.isDefined)
+    assertFalse(preprevious.getFilterFunction.isDefined)
+
+    assertEquals(pattern.getName, "end")
+    assertEquals(previous.getName, "next")
+    assertEquals(preprevious.getName, "start")
+  }
+
+  @Test
+  def testPatternWithSubtyping: Unit = {
+    val pattern = Pattern.begin[Event]("start")
+      .next("subevent")
+      .subtype(classOf[SubEvent])
+      .followedBy("end")
+
+    val jPattern = JPattern.begin[Event]("start")
+      .next("subevent")
+      .subtype(classOf[SubEvent])
+      .followedBy("end")
+
+    assertTrue(checkCongruentRepresentations(pattern, jPattern))
+    assertTrue(checkCongruentRepresentations(wrapPattern(jPattern).get, jPattern))
+
+    val previous = pattern.getPrevious.orNull
+    val preprevious = previous.getPrevious.orNull
+
+    assertTrue(pattern.getPrevious.isDefined)
+    assertTrue(previous.getPrevious.isDefined)
+    assertFalse(preprevious.getPrevious.isDefined)
+
+    assertTrue(previous.getFilterFunction.isDefined)
+    assertTrue(previous.getFilterFunction.get.isInstanceOf[SubtypeFilterFunction[_]])
+
+    assertEquals(pattern.getName, "end")
+    assertEquals(previous.getName, "subevent")
+    assertEquals(preprevious.getName, "start")
+  }
+
+  @Test
+  def testPatternWithSubtypingAndFilter: Unit = {
+    val pattern = Pattern.begin[Event]("start")
+      .next("subevent")
+      .subtype(classOf[SubEvent])
+      .where((value: SubEvent) => false)
+      .followedBy("end")
+
+    val jpattern = JPattern.begin[Event]("start")
+      .next("subevent")
+      .subtype(classOf[SubEvent])
+      .where(new FilterFunction[SubEvent]() {
+        @throws[Exception]
+        def filter(value: SubEvent): Boolean = {
+          return false
+        }
+      }).followedBy("end")
+
+    assertTrue(checkCongruentRepresentations(pattern, jpattern))
+    assertTrue(checkCongruentRepresentations(wrapPattern(jpattern).get, jpattern))
+
+
+    val previous = pattern.getPrevious.orNull
+    val preprevious = previous.getPrevious.orNull
+
+    assertTrue(pattern.getPrevious.isDefined)
+    assertTrue(previous.getPrevious.isDefined)
+    assertFalse(preprevious.getPrevious.isDefined)
+
+    assertTrue(pattern.isInstanceOf[FollowedByPattern[_, _]])
+    assertTrue(previous.getFilterFunction.isDefined)
+
+    assertEquals(pattern.getName, "end")
+    assertEquals(previous.getName, "subevent")
+    assertEquals(preprevious.getName, "start")
+  }
+
+  def checkCongruentRepresentations[T, _ <: T](pattern: Pattern[T, _ <: T],
+                                               jPattern: JPattern[T, _ <: T]): Boolean = {
+    ((pattern == null && jPattern == null)
+      || (pattern != null && jPattern != null)
+      //check equal pattern names
+      && threeWayEquals(
+      pattern.getName,
+      pattern.wrappedPattern.getName,
+      jPattern.getName())
+      //check equal time windows
+      && threeWayEquals(
+      pattern.getWindowTime.orNull,
+      pattern.wrappedPattern.getWindowTime,
+      jPattern.getWindowTime())
+      //check congruent class names / types
+      && threeWayEquals(
+      pattern.getClass.getSimpleName,
+      pattern.wrappedPattern.getClass.getSimpleName,
+      jPattern.getClass().getSimpleName())
+      //best effort to confirm congruent filter functions
+      && compareFilterFunctions(
+      pattern.getFilterFunction.orNull,
+      jPattern.getFilterFunction())
+      //recursively check previous patterns
+      && checkCongruentRepresentations(
+      pattern.getPrevious.orNull,
+      jPattern.getPrevious()))
+  }
+
+  def threeWayEquals(a: AnyRef, b: AnyRef, c: AnyRef): Boolean = {
+    a == b && b == c
+  }
+
+  def compareFilterFunctions(sFilter: FilterFunction[_], jFilter: FilterFunction[_]): Boolean = {
+    /**
+      * We would like to simply compare the filter functions like this:
+      *
+      * {{{(pattern.getFilterFunction.orNull == jPattern.getFilterFunction)}}}
+      *
+      * However, the closure cleaning makes comparing filter functions by reference impossible.
+      * Testing for functional equivalence is an undecidable problem. Thus, for do a best effort by
+      * simply matching the presence/absence and known classes of filter functions in the patterns.
+      */
+    (sFilter, jFilter) match {
+      //matching types: and-filter; branch and recurse for inner filters
+      case (saf: AndFilterFunction[_], jaf: AndFilterFunction[_])
+      => (compareFilterFunctions(saf.getLeft(), jaf.getLeft())
+        && compareFilterFunctions(saf.getRight(), jaf.getRight()))
+      //matching types: subtype-filter
+      case (saf: SubtypeFilterFunction[_], jaf: SubtypeFilterFunction[_]) => true
+      //mismatch: one-sided and/subtype-filter
+      case (_: AndFilterFunction[_] | _: SubtypeFilterFunction[_], _) => false
+      case (_, _: AndFilterFunction[_] | _: SubtypeFilterFunction[_]) => false
+      //from here we can only check mutual presence or absence of a function
+      case (s: FilterFunction[_], j: FilterFunction[_]) => true
+      case (null, null) => true
+      case _ => false
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/pom.xml b/flink-libraries/flink-cep/pom.xml
index dee7254..7932e14 100644
--- a/flink-libraries/flink-cep/pom.xml
+++ b/flink-libraries/flink-cep/pom.xml
@@ -17,63 +17,79 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
-	<modelVersion>4.0.0</modelVersion>
+    <modelVersion>4.0.0</modelVersion>
 
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-libraries</artifactId>
-		<version>1.1-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-	
-	<artifactId>flink-cep_2.10</artifactId>
-	<name>flink-cep</name>
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-libraries</artifactId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
 
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
+    <artifactId>flink-cep_2.10</artifactId>
+    <name>flink-cep</name>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.10</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.10</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
 
-	<packaging>jar</packaging>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_2.10</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <packaging>jar</packaging>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 88505a4..57c5a9b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -61,7 +61,8 @@ public class PatternStream<T> {
 	public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction) {
 		// we have to extract the output type from the provided pattern selection function manually
 		// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
-		TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+
+		TypeInformation<R> returnType = TypeExtractor.getUnaryOperatorReturnType(
 			patternSelectFunction,
 			PatternSelectFunction.class,
 			1,
@@ -70,8 +71,24 @@ public class PatternStream<T> {
 			null,
 			false);
 
+		return select(patternSelectFunction, returnType);
+	}
+
+	/**
+	 * Applies a select function to the detected pattern sequence. For each pattern sequence the
+	 * provided {@link PatternSelectFunction} is called. The pattern select function can produce
+	 * exactly one resulting element.
+	 *
+	 * @param patternSelectFunction The pattern select function which is called for each detected
+	 *                              pattern sequence.
+	 * @param <R> Type of the resulting elements
+	 * @param outTypeInfo Explicit specification of output type.
+	 * @return {@link DataStream} which contains the resulting elements from the pattern select
+	 *         function.
+	 */
+	public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
 		return patternStream.map(
-			new PatternSelectMapper<T, R>(
+			new PatternSelectMapper<>(
 				patternStream.getExecutionEnvironment().clean(patternSelectFunction)))
 			.returns(outTypeInfo);
 	}
@@ -99,8 +116,24 @@ public class PatternStream<T> {
 			null,
 			false);
 
+		return flatSelect(patternFlatSelectFunction, outTypeInfo);
+	}
+
+	/**
+	 * Applies a flat select function to the detected pattern sequence. For each pattern sequence
+	 * the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function
+	 * can produce an arbitrary number of resulting elements.
+	 *
+	 * @param patternFlatSelectFunction The pattern flat select function which is called for each
+	 *                                  detected pattern sequence.
+	 * @param <R> Typ of the resulting elements
+	 * @param outTypeInfo Explicit specification of output type.
+	 * @return {@link DataStream} which contains the resulting elements from the pattern flat select
+	 *         function.
+	 */
+	public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
 		return patternStream.flatMap(
-			new PatternFlatSelectMapper<T, R>(
+			new PatternFlatSelectMapper<>(
 				patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
 			)).returns(outTypeInfo);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
index d01643d..ecaee07 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
@@ -41,4 +41,12 @@ public class AndFilterFunction<T> implements FilterFunction<T> {
 	public boolean filter(T value) throws Exception {
 		return left.filter(value) && right.filter(value);
 	}
+
+	public FilterFunction<T> getLeft() {
+		return left;
+	}
+
+	public FilterFunction<T> getRight() {
+		return right;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 560ea0c..696518e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -45,7 +45,7 @@ public class Pattern<T, F extends T> {
 	private final String name;
 
 	// previous pattern operator
-	private final Pattern<T, ?> previous;
+	private final Pattern<T, ? extends T> previous;
 
 	// filter condition for an event to be matched
 	private FilterFunction<F> filterFunction;
@@ -53,7 +53,7 @@ public class Pattern<T, F extends T> {
 	// window length in which the pattern match has to occur
 	private Time windowTime;
 
-	protected Pattern(final String name, final Pattern<T, ?> previous) {
+	protected Pattern(final String name, final Pattern<T, ? extends T> previous) {
 		this.name = name;
 		this.previous = previous;
 	}
@@ -62,7 +62,7 @@ public class Pattern<T, F extends T> {
 		return name;
 	}
 
-	public Pattern<T, ?> getPrevious() {
+	public Pattern<T, ? extends T> getPrevious() {
 		return previous;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/pom.xml b/flink-libraries/pom.xml
index 4f26754..b39cbd5 100644
--- a/flink-libraries/pom.xml
+++ b/flink-libraries/pom.xml
@@ -41,5 +41,6 @@ under the License.
 		<module>flink-table</module>
 		<module>flink-ml</module>
 		<module>flink-cep</module>
+		<module>flink-cep-scala</module>
 	</modules>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index 4310852..7358624 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -215,6 +215,19 @@ under the License.
 					<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
 				</configuration>
 			</plugin>
+
+			<!-- Generate the test-jar -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
 	</build>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
index d1055d0..907ad9f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
@@ -82,6 +82,10 @@ abstract class ScalaAPICompletenessTestBase extends TestLogger {
     }
   }
 
+  protected def checkEquality(scalaInstance: AnyRef, extractJavaFun : ((AnyRef) => AnyRef)) {
+    val javaInstance = extractJavaFun(scalaInstance)
+  }
+
   /**
    * Tests to be performed to ensure API completeness.
    */


Mime
View raw message