flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject flink git commit: [FLINK-6658] [cep] Use scala Collections in scala CEP API.
Date Fri, 26 May 2017 12:49:33 GMT
Repository: flink
Updated Branches:
  refs/heads/master 38c45f805 -> 63f182a4f


[FLINK-6658] [cep] Use scala Collections in scala CEP API.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63f182a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63f182a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63f182a4

Branch: refs/heads/master
Commit: 63f182a4f0b516281aa9efedd6c5d9e6904d4999
Parents: 38c45f8
Author: Dawid Wysakowicz <dawid@getindata.com>
Authored: Mon May 22 17:14:12 2017 +0200
Committer: kkloudas <kkloudas@gmail.com>
Committed: Fri May 26 14:47:42 2017 +0200

----------------------------------------------------------------------
 .../scala/org/apache/flink/cep/scala/CEP.scala  |  3 +-
 .../apache/flink/cep/scala/PatternStream.scala  | 37 +++++++++-----------
 .../flink/cep/scala/conditions/Context.scala    | 33 +++++++++++++++++
 .../org/apache/flink/cep/scala/package.scala    | 11 +++++-
 .../flink/cep/scala/pattern/Pattern.scala       | 34 +++++++++++++++---
 .../flink/cep/scala/pattern/package.scala       | 10 ++++++
 ...StreamScalaJavaAPIInteroperabilityTest.scala | 37 ++++++++++----------
 7 files changed, 118 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63f182a4/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala
index f28aee2..05c8e8c 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala
@@ -36,8 +36,7 @@ object CEP {
     * @tparam T Type of the input events
     * @return Resulting pattern stream
     */
-  def pattern[T](input: DataStream[T],
-                                  pattern: Pattern[T, _]): PatternStream[T] = {
+  def pattern[T](input: DataStream[T], pattern: Pattern[T, _]): PatternStream[T] = {
     wrapPatternStream(JCEP.pattern(input.javaStream, pattern.wrappedPattern))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/63f182a4/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index e71439c..ee7b6a9 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -31,9 +31,7 @@ import java.lang.{Long => JLong}
 
 import org.apache.flink.cep.operator.CEPOperatorUtils
 import org.apache.flink.cep.scala.pattern.Pattern
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import scala.collection.Map
 
 /**
   * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits
detected
@@ -199,14 +197,12 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
     * @tparam R Type of the resulting elements
     * @return [[DataStream]] which contains the resulting elements from the pattern select
function.
     */
-  def select[R: TypeInformation](
-    patternSelectFun: mutable.Map[String, JList[T]] => R)
-  : DataStream[R] = {
+  def select[R: TypeInformation](patternSelectFun: Map[String, Iterable[T]] => R): DataStream[R]
= {
     val cleanFun = cleanClosure(patternSelectFun)
 
     val patternSelectFunction: PatternSelectFunction[T, R] = new PatternSelectFunction[T,
R] {
 
-      def select(in: JMap[String, JList[T]]): R = cleanFun(in.asScala)
+      def select(in: JMap[String, JList[T]]): R = cleanFun(mapToScala(in))
     }
     select(patternSelectFunction)
   }
@@ -232,20 +228,20 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
     *         events.
     */
   def select[L: TypeInformation, R: TypeInformation](
-      patternTimeoutFunction: (mutable.Map[String, JList[T]], Long) => L) (
-      patternSelectFunction: mutable.Map[String, JList[T]] => R)
+      patternTimeoutFunction: (Map[String, Iterable[T]], Long) => L) (
+      patternSelectFunction: Map[String, Iterable[T]] => R)
     : DataStream[Either[L, R]] = {
 
     val cleanSelectFun = cleanClosure(patternSelectFunction)
     val cleanTimeoutFun = cleanClosure(patternTimeoutFunction)
 
     val patternSelectFun = new PatternSelectFunction[T, R] {
-      override def select(pattern: JMap[String, JList[T]]): R = cleanSelectFun(pattern.asScala)
+      override def select(pattern: JMap[String, JList[T]]): R =
+        cleanSelectFun(mapToScala(pattern))
     }
     val patternTimeoutFun = new PatternTimeoutFunction[T, L] {
-      override def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long): L =
{
-        cleanTimeoutFun(pattern.asScala, timeoutTimestamp)
-      }
+      override def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long): L =
+        cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp)
     }
 
     select(patternTimeoutFun, patternSelectFun)
@@ -262,7 +258,7 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
     * @return [[DataStream]] which contains the resulting elements from the pattern flat
select
     *         function.
     */
-  def flatSelect[R: TypeInformation](patternFlatSelectFun: (mutable.Map[String, JList[T]],
+  def flatSelect[R: TypeInformation](patternFlatSelectFun: (Map[String, Iterable[T]],
     Collector[R]) => Unit): DataStream[R] = {
     val cleanFun = cleanClosure(patternFlatSelectFun)
 
@@ -270,7 +266,7 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
       new PatternFlatSelectFunction[T, R] {
 
         def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit =
-          cleanFun(pattern.asScala, out)
+          cleanFun(mapToScala(pattern), out)
       }
     flatSelect(patternFlatSelectFunction)
   }
@@ -296,17 +292,16 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
     *         timeout events wrapped in a [[Either]] type.
     */
   def flatSelect[L: TypeInformation, R: TypeInformation](
-      patternFlatTimeoutFunction: (mutable.Map[String, JList[T]], Long, Collector[L]) =>
Unit) (
-      patternFlatSelectFunction: (mutable.Map[String, JList[T]], Collector[R]) => Unit)
+      patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, Collector[L]) => Unit)
(
+      patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) => Unit)
     : DataStream[Either[L, R]] = {
 
     val cleanSelectFun = cleanClosure(patternFlatSelectFunction)
     val cleanTimeoutFun = cleanClosure(patternFlatTimeoutFunction)
 
     val patternFlatSelectFun = new PatternFlatSelectFunction[T, R] {
-      override def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit =
{
-        cleanSelectFun(pattern.asScala, out)
-      }
+      override def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit =
+        cleanSelectFun(mapToScala(pattern), out)
     }
 
     val patternFlatTimeoutFun = new PatternFlatTimeoutFunction[T, L] {
@@ -314,7 +309,7 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
         pattern: JMap[String, JList[T]],
         timeoutTimestamp: Long, out: Collector[L])
       : Unit = {
-        cleanTimeoutFun(pattern.asScala, timeoutTimestamp, out)
+        cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp, out)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63f182a4/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/conditions/Context.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/conditions/Context.scala
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/conditions/Context.scala
new file mode 100644
index 0000000..8bb7133
--- /dev/null
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/conditions/Context.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.conditions
+
+import java.io.Serializable
+
+/**
+  * The context used when evaluating the {@link IterativeCondition condition}.
+  */
+trait Context[T] extends Serializable {
+  /**
+    * @return An { @link Iterable} over the already accepted elements
+    *                    for a given pattern. Elements are iterated in the order they were
+    *                    inserted in the pattern.
+    * @param name The name of the pattern.
+    */
+  def getEventsForPattern(name: String): Iterable[T]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63f182a4/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/package.scala
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/package.scala
index 179174b..223e9c9 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/package.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/package.scala
@@ -17,11 +17,16 @@
  */
 package org.apache.flink.cep
 
+import java.util.{List => JList, Map => JMap}
+
 import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.cep.{PatternStream => JPatternStream}
 
 package object scala {
 
+  import collection.JavaConverters._
+  import collection.Map
+
   /**
     * Utility method to wrap [[org.apache.flink.cep.PatternStream]] for usage with the Scala
API.
     *
@@ -40,7 +45,11 @@ package object scala {
 
   private[flink] def cleanClosure[F <: AnyRef](f: F, checkSerializable: Boolean = true):
F = {
     ClosureCleaner.clean(f, checkSerializable)
-    return f
+    f
+  }
+
+  private[flink] def mapToScala[T](map: JMap[String, JList[T]]): Map[String, Iterable[T]]
= {
+    map.asScala.mapValues(_.asScala.toIterable)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63f182a4/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index fe7a30c..c77e70d 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -18,9 +18,10 @@
 package org.apache.flink.cep.scala.pattern
 
 import org.apache.flink.cep
-import org.apache.flink.cep.pattern.conditions.IterativeCondition
-import org.apache.flink.cep.pattern.conditions.IterativeCondition.Context
+import org.apache.flink.cep.pattern.conditions.IterativeCondition.{Context => JContext}
+import org.apache.flink.cep.pattern.conditions.{IterativeCondition, SimpleCondition}
 import org.apache.flink.cep.pattern.{MalformedPatternException, Quantifier, Pattern =>
JPattern}
+import org.apache.flink.cep.scala.conditions.Context
 import org.apache.flink.streaming.api.windowing.time.Time
 
 /**
@@ -103,7 +104,9 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     val condFun = new IterativeCondition[F] {
       val cleanCond = cep.scala.cleanClosure(condition)
 
-      override def filter(value: F, ctx: Context[F]): Boolean = cleanCond(value, ctx)
+      override def filter(value: F, ctx: JContext[F]): Boolean = {
+        cleanCond(value, new JContextWrapper(ctx))
+      }
     }
     where(condFun)
   }
@@ -122,7 +125,7 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     val condFun = new IterativeCondition[F] {
       val cleanCond = cep.scala.cleanClosure(condition)
 
-      override def filter(value: F, ctx: Context[F]): Boolean = cleanCond(value)
+      override def filter(value: F, ctx: JContext[F]): Boolean = cleanCond(value)
     }
     where(condFun)
   }
@@ -152,11 +155,32 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     * @param condition The {{{OR}}} condition.
     * @return The pattern with the new condition is set.
     */
+  def or(condition: F => Boolean): Pattern[T, F] = {
+    val condFun = new SimpleCondition[F] {
+      val cleanCond = cep.scala.cleanClosure(condition)
+
+      override def filter(value: F): Boolean =
+        cleanCond(value)
+    }
+    or(condFun)
+  }
+
+  /**
+    * Adds a condition that has to be satisfied by an event
+    * in order to be considered a match. If another condition has already been
+    * set, the new one is going to be combined with the previous with a
+    * logical {{{OR}}}. In other case, this is going to be the only
+    * condition.
+    *
+    * @param condition The {{{OR}}} condition.
+    * @return The pattern with the new condition is set.
+    */
   def or(condition: (F, Context[F]) => Boolean): Pattern[T, F] = {
     val condFun = new IterativeCondition[F] {
       val cleanCond = cep.scala.cleanClosure(condition)
 
-      override def filter(value: F, ctx: Context[F]): Boolean = cleanCond(value, ctx)
+      override def filter(value: F, ctx: JContext[F]): Boolean =
+        cleanCond(value, new JContextWrapper(ctx))
     }
     or(condFun)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/63f182a4/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
index 26355a5..13305dd 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
@@ -18,6 +18,9 @@
 package org.apache.flink.cep.scala
 
 import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.cep.pattern.conditions.IterativeCondition.{Context => JContext}
+import org.apache.flink.cep.scala.conditions.Context
+import scala.collection.JavaConverters._
 
 package object pattern {
   /**
@@ -34,5 +37,12 @@ package object pattern {
     case p: JPattern[T, F] => Some(Pattern[T, F](p))
     case _ => None
   }
+
+  private[pattern] class JContextWrapper[F](private val jContext: JContext[F])
+    extends Context[F] with Serializable {
+
+    override def getEventsForPattern(name: String): Iterable[F] =
+      jContext.getEventsForPattern(name).asScala
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63f182a4/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
index e92c268..43bef23 100644
--- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
+++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
@@ -31,7 +31,7 @@ import java.util.{Map => JMap}
 import java.util.{List => JList}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
+import scala.collection.Map
 import org.junit.Assert._
 import org.junit.Test
 
@@ -44,15 +44,15 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger
{
     val dummyDataStream: DataStream[(Int, Int)] = env.fromElements()
     val pattern: Pattern[(Int, Int), _] = Pattern.begin[(Int, Int)]("dummy")
     val pStream: PatternStream[(Int, Int)] = CEP.pattern(dummyDataStream, pattern)
-    val param = mutable.Map("begin" -> List((1, 2)).asJava).asJava
+    val param = Map("begin" -> List((1, 2)))
     val result: DataStream[(Int, Int)] = pStream
-      .select((pattern: mutable.Map[String, JList[(Int, Int)]]) => {
+      .select((pattern: Map[String, Iterable[(Int, Int)]]) => {
         //verifies input parameter forwarding
-        assertEquals(param, pattern.asJava)
+        assertEquals(param, pattern)
         param.get("begin").get(0)
       })
     val out = extractUserFunction[StreamMap[JMap[String, JList[(Int, Int)]], (Int, Int)]](result)
-      .getUserFunction.map(param)
+      .getUserFunction.map(param.mapValues(_.asJava).asJava)
     //verifies output parameter forwarding
     assertEquals(param.get("begin").get(0), out)
   }
@@ -65,20 +65,20 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger
{
     val pattern: Pattern[List[Int], _] = Pattern.begin[List[Int]]("dummy")
     val pStream: PatternStream[List[Int]] = CEP.pattern(dummyDataStream, pattern)
     val inList = List(1, 2, 3)
-    val inParam = mutable.Map("begin" -> List(inList).asJava).asJava
+    val inParam = Map("begin" -> List(inList))
     val outList = new java.util.ArrayList[List[Int]]
     val outParam = new ListCollector[List[Int]](outList)
 
     val result: DataStream[List[Int]] = pStream
 
-      .flatSelect((pattern: mutable.Map[String, JList[List[Int]]], out: Collector[List[Int]])
=> {
+      .flatSelect((pattern: Map[String, Iterable[List[Int]]], out: Collector[List[Int]])
=> {
         //verifies input parameter forwarding
-        assertEquals(inParam, pattern.asJava)
-        out.collect(pattern.get("begin").get.get(0))
+        assertEquals(inParam, pattern)
+        out.collect(pattern.get("begin").get.head)
       })
 
     extractUserFunction[StreamFlatMap[java.util.Map[String, JList[List[Int]]], List[Int]]](result).
-      getUserFunction.flatMap(inParam, outParam)
+      getUserFunction.flatMap(inParam.mapValues(_.asJava).asJava, outParam)
     //verify output parameter forwarding and that flatMap function was actually called
     assertEquals(inList, outList.get(0))
   }
@@ -90,22 +90,22 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger
{
     val dummyDataStream: DataStream[String] = env.fromElements()
     val pattern: Pattern[String, _] = Pattern.begin[String]("dummy")
     val pStream: PatternStream[String] = CEP.pattern(dummyDataStream, pattern)
-    val inParam = mutable.Map("begin" -> List("barfoo").asJava).asJava
+    val inParam = Map("begin" -> List("barfoo"))
     val outList = new java.util.ArrayList[Either[String, String]]
     val output = new ListCollector[Either[String, String]](outList)
     val expectedOutput = List(Right("match"), Right("barfoo"), Left("timeout"), Left("barfoo"))
       .asJava
 
     val result: DataStream[Either[String, String]] = pStream.flatSelect {
-        (pattern: mutable.Map[String, JList[String]], timestamp: Long, out: Collector[String])
=>
+        (pattern: Map[String, Iterable[String]], timestamp: Long, out: Collector[String])
=>
           out.collect("timeout")
-          out.collect(pattern("begin").get(0))
+          out.collect(pattern("begin").head)
       } {
-        (pattern: mutable.Map[String, JList[String]], out: Collector[String]) =>
+        (pattern: Map[String, Iterable[String]], out: Collector[String]) =>
           //verifies input parameter forwarding
-          assertEquals(inParam, pattern.asJava)
+          assertEquals(inParam, pattern)
           out.collect("match")
-          out.collect(pattern("begin").get(0))
+          out.collect(pattern("begin").head)
       }
 
     val fun = extractUserFunction[
@@ -115,8 +115,9 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger
{
           JMap[String, JList[String]]],
         Either[String, String]]](result)
 
-    fun.getUserFunction.flatMap(FEither.Right(inParam), output)
-    fun.getUserFunction.flatMap(FEither.Left(FTuple2.of(inParam, 42L)), output)
+    fun.getUserFunction.flatMap(FEither.Right(inParam.mapValues(_.asJava).asJava), output)
+    fun.getUserFunction.flatMap(FEither.Left(FTuple2.of(inParam.mapValues(_.asJava).asJava,
42L)),
+                                output)
 
     assertEquals(expectedOutput, outList)
   }


Mime
View raw message