flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dawid Wysakowicz <wysakowicz.da...@gmail.com>
Subject Re: FlinkCEP behaviour with time constraints not as expected
Date Wed, 08 Nov 2017 12:51:12 GMT
Unforunately there is mistake in the docs the return type should be DataStream rather than
SingleOuputStream

The correct version should be:

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val outputTag = OutputTag[String]("side-output")

val result: DataStream[ComplexEvent] = patternStream.select(outputTag){
	(pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
} {
	pattern: Map[String, Iterable[Event]] => ComplexEvent()
}

This syntax is only available in 1.4 though, in previous versions timeouted events were not
returned via sideOutput.



> On 8 Nov 2017, at 12:18, Federico D'Ambrosio <federico.dambrosio@smartlab.ws> wrote:
> 
> Thank you very much, Dawid, for your thorough explanation, really useful. I totally missed
the distinction between timed-out events and complete matches.
> 
> I'd like to ask you one more thing, about the flinkCEP scala api: in the documentation,
there is the following code:
> 
> val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
> 
> 
> 
> val outputTag = OutputTag[String]("side-output")
> 
> 
> 
> val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outputTag){
> 
> 
> (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
> } {
> 
> 
> pattern: Map[String, Iterable[Event]] => ComplexEvent()
> }
> 
> where result would then be used to get outputtag side output.
> If I paste this code I get that the select function is missing its parameters ("Unspecified
value parameters: patternSelectFunction: PatternSelectFunction[ComplexEvent, NotInferredR]""),
> while, If I add the parameters explicitly such as
> 
> patternStream.select[TimeoutEvent, ComplexEvent]
> 
> I get "Too many arguments for select". Am I missing something?
> 
> Thank you very much,
> Federico
> 
> 2017-11-07 16:34 GMT+01:00 Dawid Wysakowicz <wysakowicz.dawid@gmail.com>:
> Hi Federico,
> 
> For your given input and pattern there should (and there are) only two timeouted patterns:
> 
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02))))
> 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02))))
> 
> It is because in your patterns say the next event after events with value >=100 should
not have value >= 100 . And within your timeout there is no sequence of events where (>=100)+
(<100).
> 
> But I will try to explain how it works with the same input for Pattern:
> 
> Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> .notNext("end").where(_.value <100).within(Time.minutes(30))
> 
> Then we have matches:
> 
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02))))
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02))))
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02))))
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02),
Event(100,2017-11-05T03:54:02))))
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02))))
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:54:02))))
> 
> and timeouted partial matches:
> 
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02),
Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02))))
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02),
Event(100,2017-11-05T03:56:02))))
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02))))
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02))))
> 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02))))
> 
> Right now (in flink 1.3.2) pattern can start on each event (in 1.4 you will be able to
specify AFTER_MATCH_SKIP strategy see: https://issues.apache.org/jira/browse/FLINK-7169),
therefore you see matches starting at 2017-11-05T03:50:02, 2017-11-05T03:52:02, 2017-11-05T03:54:02.
> Also right now the oneOrMore is not greedy (in 1.4 you will be able to alter it see:
https://issues.apache.org/jira/browse/FLINK-7147), therefore you see matches like: List(Event(100,2017-11-05T03:50:02))
and List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02)) rather than only
one of those.
> 
> The timeoute partial matches are returned because within the timeout there was no event
with value <100 (in fact there was no event at all to be checked).
> 
> Hope this "study" helps you understand the behaviour. If you feel I missed something,
please provide some example I could reproduce.
> 
> Regards,
> Dawid
> 
> 2017-11-07 11:29 GMT+01:00 Ufuk Celebi <uce@apache.org>:
> Hey Frederico,
> 
> let me pull in Dawid (cc'd) who works on CEP. He can probably clarify
> the expected behaviour here.
> 
> Best,
> 
> Ufuk
> 
> 
> On Mon, Nov 6, 2017 at 12:06 PM, Federico D'Ambrosio
> <federico.dambrosio@smartlab.ws> wrote:
> > Hi everyone,
> >
> > I wanted to ask if FlinkCEP in the following scenario is working as it
> > should, or I have misunderstood its functioning.
> >
> > I've got a keyedstream associated with the following pattern:
> >
> > Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> > .notNext("end").where(_.value >=100).within(Time.minutes(30))
> >
> > Considering a single key in the stream, for simplicity, I've got the
> > following sequence of events (using EventTime on the "time" field of the
> > json event):
> >
> > {value: 100, time: "2017-11-05 03:50:02.000"}
> > {value: 100, time: "2017-11-05 03:52:02.000"}
> > {value: 100, time: "2017-11-05 03:54:02.000"}
> > {value: 100, time: "2017-11-05 03:56:02.000"} // end of events within the 30
> > minutes from the first event
> > {value: 100, time: "2017-11-05 06:00:02.000"}
> >
> > Now, when it comes to the select/flatselect function, I tried printing the
> > content of the pattern map and what I noticed is that, for example, the
> > first 2 events weren't considered in the same pattern as the map was like
> > the following:
> >
> > {start=[{value: 100, time: 2017-11-05 03:50:02.000}]}
> > {start=[{value: 100, time: 2017-11-05 03:52:02.000}]}
> >
> > Now, shouldn't they be in the same List, as they belong to the same
> > iterative pattern, defined with the oneOrMore clause?
> >
> > Thank you for your insight,
> > Federico D'Ambrosio
> 
> 
> 
> 
> --
> Federico D'Ambrosio


Mime
View raw message