flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Flink CEP AbstractCEPPatternOperator fail after event detection
Date Thu, 07 Apr 2016 10:08:01 GMT
Hi Norman,

could you provide me an example input data set which produces the error?
E.g. the list of strings you inserted into Kafka/read from Kafka?

Cheers,
Till

On Thu, Apr 7, 2016 at 11:05 AM, norman sp <wir12kqe@studserv.uni-leipzig.de
> wrote:

> Hi Till,
> thank you. here's the code:
>
> public class CepStorzSimulator {
>
>         public static void main(String[] args) throws Exception {
>
>                         final ParameterTool parameterTool =
> ParameterTool.fromArgs(args);
>
>                         if(parameterTool.getNumberOfParameters() < 3) {
>                                 System.out.println("Missing
> parameters!\nUsage: Kafka --topic <topic>
> --bootstrap.servers <kafka brokers> --group.id <some id>");
>                                 System.exit(1);
>                         }
>
>                         CepStorzSimulator reader = new CepStorzSimulator();
>                 reader.run(parameterTool);
>         }
>
>         public void run(ParameterTool parameterTool) throws Exception {
>
>                 String topic = "test-simulator";
>
>                 StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>                 //env.getConfig().disableSysoutLogging();
>
> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
> 5000));
>                 //env.enableCheckpointing(15000);
>                                              // create a checkpoint every 5
> seconds
>                 env.setParallelism(4);
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>
>         DataStream<String> kafkaStream = env.addSource(new
> FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(),
> parameterTool.getProperties()));
>
>         DataStream<Tuple5&lt;String, String, String, String, Double>> data
> =
> kafkaStream.flatMap(new SplitMapper());
>
>         SingleOutputStreamOperator<Tuple6&lt;String, String, String,
> Double,
> Double, Double>> windowedData =
>                                  data.filter(new
> FilterFunction<Tuple5&lt;String, String, String,
> String, Double>>() {
>
>                                         private static final long
> serialVersionUID = -5952425756492833594L;
>
>                                                 @Override
>                                                 public boolean
> filter(Tuple5<String, String, String, String, Double>
> val) throws Exception {
>
>                                                         return
> val.f3.contains("target - Value");
>                                                 }
>                                          })
>                                          .keyBy(3)
>                                          .timeWindow(Time.seconds(10),
> Time.seconds(1))
>                                          .fold(new Tuple6<>("", "", "",
> 0.0d, 0.0d, 0.0d), new
> pressureElementCount());
>
>                 windowedData.print();
>
>                 Pattern<Tuple6&lt;String, String, String, Double, Double,
> Double>, ?>
> FlowFirstPattern =
>                                 Pattern.<Tuple6&lt;String, String, String,
> Double, Double,
> Double>>begin("FlowOver10")
>                             .where(new FilterFunction<Tuple6&lt;String,
> String, String, Double,
> Double, Double>>() {
>
>                                 private static final long serialVersionUID
> = 5861517245439863889L;
>
>                                         @Override
>                                         public boolean
> filter(Tuple6<String, String, String, Double, Double,
> Double> value) throws Exception {
>
>                                                 double avgFlow=
> (value.f5/value.f4);
>
>                                                 return
> value.f2.contains("Flow target - Value") && avgFlow > 25.0;//
> && (value.f2 > avgFlow*1.0);
>                                         }
>                                 })
>                             .followedBy("PressureOver10").where(new
> FilterFunction<Tuple6&lt;String, String, String, Double, Double, Double>>()
> {
>
>                                         private static final long
> serialVersionUID = -4037517308930307522L;
>
>                                         @Override
>                                         public boolean
> filter(Tuple6<String, String, String, Double, Double,
> Double> value) throws Exception {
>
>                                                 double avgPressure =
> (value.f5/value.f4);
>
> //System.out.println("Pressure: " + avgPressure);
>
>                                                 return
> value.f2.equals("Pressure target - Value") && (avgPressure >
> 5.0);// && (value.f2 > avgPressure*1.0);
>                                         }
>                                 })
>                             .within(Time.seconds(10));
>
>                 PatternStream<Tuple6&lt;String, String, String, Double,
> Double, Double>>
> FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);
>                 DataStream<String> warning =
> FlowFirstPatternStream.select(new
> PlacingWorkingTrocarWarning());
>                 warning.print();
>
>                 env.execute();
>         }
>
>         private static class PlacingWorkingTrocarWarning implements
> PatternSelectFunction<Tuple6&lt;String, String, String, Double, Double,
> Double>, String> {
>
>                 private static final long serialVersionUID =
> 2576609635170800026L;
>
>                 @Override
>                 public String select(Map<String, Tuple6&lt;String, String,
> String, Double,
> Double, Double>> pat) throws Exception {
>
>                         //Tuple5<String, String, Double, Double, Double>
> pressure =
> pat.get("PressureOver10");
>                         //Tuple5<String, String, Double, Double, Double>
> flow =
> pat.get("FlowOver10");
>
>                         return "  #######   Warning! FlowEvent   ####### ";
>                 }
>         }
>
>         private static class pressureElementCount implements
> FoldFunction<Tuple5&lt;String, String, String, String, Double>,
> Tuple6<String, String, String, Double, Double, Double>>{
>
>                 private static final long serialVersionUID =
> -1081752808506520154L;
>
>                 @Override
>                 public Tuple6<String, String, String, Double, Double,
> Double>
> fold(Tuple6<String, String, String, Double, Double, Double> init,
> Tuple5<String, String, String, String, Double> val) throws Exception {
>
>                         double count = init.f4+1.0d;
>                         double sum = init.f5+val.f4; //!!!
>                         return new Tuple6<>(val.f0, val.f1, val.f3,
> val.f4, count, sum);
>                 }
>         }
>
>         private static class SplitMapper extends
> RichFlatMapFunction<String,
> Tuple5&lt;String, String, String, String, Double>> {
>
>                 private static final long serialVersionUID =
> 7297664214330222193L;
>
>                 @Override
>                 public void flatMap(String msg,
> Collector<Tuple5&lt;String, String,
> String, String, Double>> out) throws Exception {
>
>                         EncodeValues enc = new EncodeValues();
>
> getRuntimeContext().getLongCounter("eventCount").add(1L);
>                         String[] split_msg = msg.split("\t");
>                         String DeviceId = split_msg[1];
>                         String [] array = split_msg[3].split(", \"");
>
>                         for(String a:array){
>
>                                         String[] split = a.split(":");
>                                         String val = split[1];
>                                         String testname = "test1";
>
>                                         String nom = val.replace("\"",
> "").replace("{", "").replace("}",
> "").replace(",", ".");
>                                         String param =
> split[0].replace("\"", "").replace("{", "").replace("}",
> "");
>
>                                         double codedVal = enc.encode(nom);
>
>                                         out.collect(new Tuple5<String,
> String, String, String,
> Double>(UUID.randomUUID().toString(), testname, DeviceId, param,
> codedVal));
>                         }
>                 }
>         }
> }
>
>
> Example data looks like this:
> 1> 00:36:06.459 1       2121    {"Pressure target - Value":"6", "Pressure
> target -
> Unit":"mmHg"}
> 1> 00:36:06.463 1       2121    {"Flow target - Value":"23", "Flow target -
> Unit":"l/min"}
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948p5986.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Mime
View raw message