flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From norman sp <wir12...@studserv.uni-leipzig.de>
Subject Re: Flink CEP AbstractCEPPatternOperator fail after event detection
Date Thu, 07 Apr 2016 09:05:02 GMT
Hi Till,
thank you. here's the code:

public class CepStorzSimulator {
	
	public static void main(String[] args) throws Exception {
	    	
			final ParameterTool parameterTool = ParameterTool.fromArgs(args);
	
			if(parameterTool.getNumberOfParameters() < 3) {
				System.out.println("Missing parameters!\nUsage: Kafka --topic <topic>
--bootstrap.servers <kafka brokers> --group.id <some id>");
				System.exit(1);
			}
						
			CepStorzSimulator reader = new CepStorzSimulator();
	    	reader.run(parameterTool);
	}
	
	public void run(ParameterTool parameterTool) throws Exception {
		
		String topic = "test-simulator";        

		StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                //env.getConfig().disableSysoutLogging();
		env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
5000));
		//env.enableCheckpointing(15000); 									// create a checkpoint every 5
seconds
		env.setParallelism(4);
		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 
		
        DataStream<String> kafkaStream = env.addSource(new
FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(),
parameterTool.getProperties()));

        DataStream<Tuple5&lt;String, String, String, String, Double>> data =
kafkaStream.flatMap(new SplitMapper());     
        
        SingleOutputStreamOperator<Tuple6&lt;String, String, String, Double,
Double, Double>> windowedData = 
				 data.filter(new FilterFunction<Tuple5&lt;String, String, String,
String, Double>>() {

					private static final long serialVersionUID = -5952425756492833594L;

						@Override
						public boolean filter(Tuple5<String, String, String, String, Double>
val) throws Exception {

							return val.f3.contains("target - Value");
						}
					 })
					 .keyBy(3)
					 .timeWindow(Time.seconds(10), Time.seconds(1))
					 .fold(new Tuple6<>("", "", "", 0.0d, 0.0d, 0.0d), new
pressureElementCount());
					 
		windowedData.print();
		
		Pattern<Tuple6&lt;String, String, String, Double, Double, Double>, ?>
FlowFirstPattern = 
				Pattern.<Tuple6&lt;String, String, String, Double, Double,
Double>>begin("FlowOver10")
	  		    .where(new FilterFunction<Tuple6&lt;String, String, String, Double,
Double, Double>>() {

	  		    	private static final long serialVersionUID = 5861517245439863889L;

					@Override
					public boolean filter(Tuple6<String, String, String, Double, Double,
Double> value) throws Exception {

						double avgFlow= (value.f5/value.f4);
						
						return value.f2.contains("Flow target - Value") && avgFlow > 25.0;//
&& (value.f2 > avgFlow*1.0);
					}
				})
			    .followedBy("PressureOver10").where(new
FilterFunction<Tuple6&lt;String, String, String, Double, Double, Double>>()
{

					private static final long serialVersionUID = -4037517308930307522L;

					@Override
					public boolean filter(Tuple6<String, String, String, Double, Double,
Double> value) throws Exception {
						
						double avgPressure = (value.f5/value.f4);
						//System.out.println("Pressure: " + avgPressure);
						
						return value.f2.equals("Pressure target - Value") && (avgPressure >
5.0);// && (value.f2 > avgPressure*1.0);
					}
				})
			    .within(Time.seconds(10));	
		
		PatternStream<Tuple6&lt;String, String, String, Double, Double, Double>>
FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);
		DataStream<String> warning = FlowFirstPatternStream.select(new
PlacingWorkingTrocarWarning());
		warning.print();		
		
		env.execute();
	}
	
	private static class PlacingWorkingTrocarWarning implements
PatternSelectFunction<Tuple6&lt;String, String, String, Double, Double,
Double>, String> {

		private static final long serialVersionUID = 2576609635170800026L;

		@Override
		public String select(Map<String, Tuple6&lt;String, String, String, Double,
Double, Double>> pat) throws Exception {

			//Tuple5<String, String, Double, Double, Double> pressure =
pat.get("PressureOver10");
			//Tuple5<String, String, Double, Double, Double> flow =
pat.get("FlowOver10");
		
			return "  #######   Warning! FlowEvent   ####### ";			
		}
	}
	
	private static class pressureElementCount implements
FoldFunction<Tuple5&lt;String, String, String, String, Double>,
Tuple6<String, String, String, Double, Double, Double>>{

		private static final long serialVersionUID = -1081752808506520154L;

		@Override
		public Tuple6<String, String, String, Double, Double, Double>
fold(Tuple6<String, String, String, Double, Double, Double> init,
Tuple5<String, String, String, String, Double> val) throws Exception {
			
			double count = init.f4+1.0d;
			double sum = init.f5+val.f4; //!!!
			return new Tuple6<>(val.f0, val.f1, val.f3, val.f4, count, sum);
		}		
	}
	
	private static class SplitMapper extends RichFlatMapFunction<String,
Tuple5&lt;String, String, String, String, Double>> {

		private static final long serialVersionUID = 7297664214330222193L;
		
		@Override
		public void flatMap(String msg, Collector<Tuple5&lt;String, String,
String, String, Double>> out) throws Exception {
			
			EncodeValues enc = new EncodeValues();			
			getRuntimeContext().getLongCounter("eventCount").add(1L);
			String[] split_msg = msg.split("\t");
			String DeviceId = split_msg[1];
			String [] array = split_msg[3].split(", \"");
									
			for(String a:array){
									
					String[] split = a.split(":");
					String val = split[1];
					String testname = "test1";

					String nom = val.replace("\"", "").replace("{", "").replace("}",
"").replace(",", ".");
					String param = split[0].replace("\"", "").replace("{", "").replace("}",
"");
										
					double codedVal = enc.encode(nom);
					
					out.collect(new Tuple5<String, String, String, String,
Double>(UUID.randomUUID().toString(), testname, DeviceId, param, codedVal));
			}			
		}
	}	
}


Example data looks like this:
1> 00:36:06.459	1	2121	{"Pressure target - Value":"6", "Pressure target -
Unit":"mmHg"}
1> 00:36:06.463	1	2121	{"Flow target - Value":"23", "Flow target -
Unit":"l/min"}





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948p5986.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message