flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From norman sp <wir12...@studserv.uni-leipzig.de>
Subject Flink CEP AbstractCEPPatternOperator fail after event detection
Date Wed, 06 Apr 2016 09:04:39 GMT
Hi,
I'm trying out the new CEP library but have some problems with event
detection.
In my case Flink detects the event pattern: A followed by B within 10
seconds.
But short time after event detection when the event pattern isn't matched
anymore, the program crashes with the error message:

04/06/2016 11:04:47	Job execution switched to status FAILING.
java.lang.NullPointerException
	at
org.apache.flink.cep.nfa.SharedBuffer.extractPatterns(SharedBuffer.java:205)
	at org.apache.flink.cep.nfa.NFA.extractPatternMatches(NFA.java:305)
	at org.apache.flink.cep.nfa.NFA.process(NFA.java:142)
	at
org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93)
	at
org.apache.flink.cep.operator.CEPPatternOperator.processWatermark(CEPPatternOperator.java:88)
	at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
	at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
	at java.lang.Thread.run(Thread.java:745)


After that, the job execution is restarted and proceeds well until the next
AbstractCEPPatternOperator failes.

That's my code:
Pattern<Tuple5&lt;String, String, Double, Double, Double>, ?> FlowPattern =
Pattern.<Tuple5&lt;String, String, Double, Double, Double>>begin("start")
.followedBy("FlowOver10")
.where(new FilterFunction<Tuple5&lt;String,String,Double, Double, Double>>()
{//some Filter}})
.followedBy("PressureOver10")
.where(new FilterFunction<Tuple5&lt;String,String,Double, Double, Double>>()
{//some Filter}})
.within(Time.seconds(10));

PatternStream<Tuple5&lt;String, String, Double, Double, Double>>
FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);
DataStream<String> warning = FlowFirstPatternStream.select(new
FlowPatternWarning());
warning.print();

private static class FlowPatternWarning implements
PatternSelectFunction<Tuple5&lt;String, String, Double, Double, Double>,
String> {
		@Override
		public String select(Map<String, Tuple5&lt;String, String, Double, Double,
Double>> pat) throws Exception {
		      Tuple5<String, String, Double, Double, Double> pressure =
pat.get("PressureOver10");
		      Tuple5<String, String, Double, Double, Double> flow =
pat.get("FlowOver10");
		
			return "  #######   Warning! FlowPattern   ####### " +
pressure.toString() + " - " + flow.toString();			
		}
	}


How can I solve that?
Hope somebody could help me.

greetz Norman




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message