flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6658) Use scala Collections in scala CEP API
Date Tue, 23 May 2017 17:12:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16021495#comment-16021495
] 

ASF GitHub Bot commented on FLINK-6658:
---------------------------------------

Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3963#discussion_r118050667
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
---
    @@ -296,25 +294,94 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
         *         timeout events wrapped in a [[Either]] type.
         */
       def flatSelect[L: TypeInformation, R: TypeInformation](
    -      patternFlatTimeoutFunction: (mutable.Map[String, JList[T]], Long, Collector[L])
=> Unit) (
    -      patternFlatSelectFunction: (mutable.Map[String, JList[T]], Collector[R]) =>
Unit)
    +      patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, Collector[L]) =>
Unit) (
    +      patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) => Unit)
         : DataStream[Either[L, R]] = {
     
         val cleanSelectFun = cleanClosure(patternFlatSelectFunction)
         val cleanTimeoutFun = cleanClosure(patternFlatTimeoutFunction)
     
         val patternFlatSelectFun = new PatternFlatSelectFunction[T, R] {
    -      override def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit
= {
    -        cleanSelectFun(pattern.asScala, out)
    +      override def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit
=
    +        cleanSelectFun(mapToScala(pattern), out)
    +    }
    +
    +    val patternFlatTimeoutFun = new PatternFlatTimeoutFunction[T, L] {
    +      override def timeout(
    +        pattern: JMap[String, JList[T]],
    +        timeoutTimestamp: Long, out: Collector[L])
    +      : Unit = {
    +        cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp, out)
           }
         }
     
    --- End diff --
    
    This is the scala way of applying `flatFunction`. We also provide both of those alternatives
for e.g. `DataStream#flatMap`.


> Use scala Collections in scala CEP API
> --------------------------------------
>
>                 Key: FLINK-6658
>                 URL: https://issues.apache.org/jira/browse/FLINK-6658
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.0
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message