flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Exception in CEP 1.1.2
Date Wed, 07 Sep 2016 14:43:47 GMT
Hi guys,

We tried building a simple pattern with the CEP library that matches 2
events with 2 filter conditions (where) but we get a strange error that
comes from the stream operator:

Pattern<Either<View, Click>, ?> viewAndClick = Pattern
                .<Either<View,, Click>> begin("view")
                .where(Either::isLeft)
                .followedBy("click").where(Either::isRight)
                .within(Time.hours(8));

CEP.pattern(stream, pattern).select(...);

We get the following exception when running this:
java.lang.RuntimeException: Failure happened in filter function.
at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:318)
at org.apache.flink.cep.nfa.NFA.process(NFA.java:162)
at
org.apache.flink.cep.operator.KeyedCEPPatternOperator.processEvent(KeyedCEPPatternOperator.java:48)
at
org.apache.flink.cep.operator.AbstractCEPBasePatternOperator.processElement(AbstractCEPBasePatternOperator.java:72)
at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processElement(AbstractKeyedCEPPatternOperator.java:161)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Could not find previous shared
buffer entry with key: State(view, Normal, [
StateTransition(TAKE, click, with filter),
StateTransition(IGNORE, view),
]), value: Left(View[...]) and timestamp: 1473258371116. This can indicate
that the element belonging to the previous relation has been already
pruned, even though you expect it to be still there.
at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:104)
at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:295)
... 9 more

Any ideas on what's going on here?

Thanks,
Gyula

Mime
View raw message