flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Bistline <srbistline.t...@gmail.com>
Subject Could not find previous entry with key.
Date Thu, 15 Nov 2018 05:10:12 GMT
Any thoughts on where to start with this error would be appreciated.

Caused by: java.lang.IllegalStateException: Could not find previous
entry with key: first event, value:
{"DEVICE_ID":f8a395a0-d3e2-11e8-b050-9779854d8172,"TIME_STAMP":11/15/2018
02:29:30.343 am,"TEMPERATURE":0.0,"HUMIDITY":"0.0","LIGHT_WHITE":0.0,"PROCX":0.0,"MOTION_DIRECTION":0,"MOTION_SPEED":0,"MOOD_STATE":0,"VISION_PEOPLE":0,"AUDIO1":0.0}
and timestamp: 1542248971585. This can indicate that either you did
not implement the equals() and hashCode() methods of your input
elements properly or that the element belonging to that entry has been
already pruned.


=====================================================

CODE HERE

=====================================================

//kinesisConsumerConfig.list(System.out);

       // Consume the data streams from AWS Kinesis stream
       DataStream<Event> dataStream = env.addSource(new FlinkKinesisConsumer<>(
               pt.getRequired("stream"),
               new EventSchema(),
               kinesisConsumerConfig))
               .name("Kinesis Stream Consumer");

       System.out.printf("Print dataStream\n");
    //dataStream.print();

       DataStream<Event> kinesisStream = dataStream
               .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
               .map(event -> (IoTEvent) event);

       // Prints the mapped records from the Kinesis stream

       //kinesisStream.print();
    //System.out.printf("Print kinesisStream\n");



       Pattern<Event, ?> pattern = Pattern
               .<Event> begin("first event").subtype(IoTEvent.class)
               .where(new IterativeCondition<IoTEvent>()
       {
           private static final long serialVersionUID = -6301755149429716724L;

           @Override
           public boolean filter(IoTEvent value, Context<IoTEvent>
ctx) throws Exception {
               PatternConstants.MOTION_FIRST = value.getMotionDir();
           return value.getMotionDir() != PatternConstants.MOTION_NA;
       }
       })
               .next("second")
               .subtype(IoTEvent.class)
               .where(new IterativeCondition<IoTEvent>() {
                   private static final long serialVersionUID =
2392863109523984059L;

                   @Override
                   public boolean filter(IoTEvent value,
Context<IoTEvent> ctx) throws Exception {

                       return value.getMotionDir() !=
PatternConstants.MOTION_NA && value.getMotionDir() !=
PatternConstants.MOTION_FIRST;
                   }
               })
               .next("third")
               .subtype(IoTEvent.class)
               .where(new IterativeCondition<IoTEvent>() {
                   private static final long serialVersionUID =
2392863109523984059L;

                   @Override
                   public boolean filter(IoTEvent value,
Context<IoTEvent> ctx) throws Exception {

                       return value.getMotionDir() !=
PatternConstants.MOTION_NA && value.getMotionDir() ==
PatternConstants.MOTION_FIRST;
                   }
               })
               .next("fourth")
               .subtype(IoTEvent.class)
               .where(new IterativeCondition<IoTEvent>() {
                   private static final long serialVersionUID =
2392863109523984059L;

                   @Override
                   public boolean filter(IoTEvent value,
Context<IoTEvent> ctx) throws Exception {

                       return value.getMotionDir() !=
PatternConstants.MOTION_NA && value.getMotionDir() !=
PatternConstants.MOTION_FIRST;
                   }
               })
               .within(Time.seconds(10));

Mime
View raw message