flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dawid Wysakowicz <wysakowicz.da...@gmail.com>
Subject Re: CEP issue in 1.3.2. Does 1.4 fix this ?
Date Fri, 12 Jan 2018 08:24:18 GMT
Hi Vishal,
I think it might be due to this bug: https://issues.apache.org/jira/browse/FLINK-8226
It was merged for 1.4.1 and 1.5.0. Could you check with this changes applied? Would be really
helpful. If the error still persists could you file a jira?

Regards
Dawid

> On 11 Jan 2018, at 19:49, Vishal Santoshi <vishal.santoshi@gmail.com> wrote:
> 
> When checkpointing is turned on a simple CEP loop pattern
> 
>  private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer,
SimpleBinaryEvent>>begin("start").where(checkStatusOn)
>         .followedBy("middle").where(checkStatusOn).times(2)
>         .next("end").where(checkStatusOn).within(Time.minutes(5))
> 
> I see failures.
> 
> SimpleBinaryEvent is
> 
> public class SimpleBinaryEvent implements Serializable {
> 
> private int id;
> private int sequence;
> private boolean status;
> private long time;
> 
> public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
> 
> this.id
>  = id;
>     this.sequence = sequence;
>     this.status = status;
>     this.time = time;
> }
> public int getId() {
>     return id;
> }
> public int getSequence() {
>     return sequence;
> }
> public boolean isStatus() {
>     return status;
> }
> public long getTime() {
>     return time;
> }
> @Override
> public boolean equals(Object o) {
>     if (this == o) return true;
>     if (o == null || getClass() != o.getClass()) return false;
> 
>     SimpleBinaryEvent that = (SimpleBinaryEvent) o;
> 
>     if (getId() != that.getId()) return false;
>     if (isStatus() != that.isStatus()) return false;
>     if (getSequence() != that.getSequence()) return false;
>     return getTime() == that.getTime();
> }
> 
> @Override
> public int hashCode() {
>     //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
>     int result = getId();
>     result = 31 * result + (isStatus() ? 1 : 0);
>     result = 31 * result + getSequence();
>     result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
>     return result;
> }
> 
> @Override
> public String toString() {
>     return "SimpleBinaryEvent{" +
>             "id='" + id + '\'' +
>             ", status=" + status +
>             ", sequence=" + sequence +
>             ", time=" + time +
>             '}';
> }
> 
> }
> 
> failure cause:
> 
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator
-> Map (1/1).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException:
Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1',
status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....
> 
> I am sure I have the equals() and hashCode() implemented the way it should be. I have
tried the Objects.hashCode too. In other instances I have had CircularReference ( and thus
stackOverflow ) on SharedBuffer.toString(), which again points to issues with references (
equality and what not ). Without checkpointing turned on it works as expected. I am running
on a local cluster. Is CEP production ready ?
> 
> I am using 1.3.2 Flink
> 


Mime
View raw message