flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vishal Santoshi <vishal.santo...@gmail.com>
Subject Re: CEP issue in 1.3.2. Does 1.4 fix this ?
Date Fri, 12 Jan 2018 15:41:21 GMT
Thanks.  We will.

   When is 1.4.1 scheduled for release ?

On Fri, Jan 12, 2018 at 3:24 AM, Dawid Wysakowicz <
wysakowicz.dawid@gmail.com> wrote:

> Hi Vishal,
> I think it might be due to this bug: https://issues.apache.org/
> jira/browse/FLINK-8226
> It was merged for 1.4.1 and 1.5.0. Could you check with this changes
> applied? Would be really helpful. If the error still persists could you
> file a jira?
>
> Regards
> Dawid
>
> > On 11 Jan 2018, at 19:49, Vishal Santoshi <vishal.santoshi@gmail.com>
> wrote:
> >
> > When checkpointing is turned on a simple CEP loop pattern
> >
> >  private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern
=
> Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("
> start").where(checkStatusOn)
> >         .followedBy("middle").where(checkStatusOn).times(2)
> >         .next("end").where(checkStatusOn).within(Time.minutes(5))
> >
> > I see failures.
> >
> > SimpleBinaryEvent is
> >
> > public class SimpleBinaryEvent implements Serializable {
> >
> > private int id;
> > private int sequence;
> > private boolean status;
> > private long time;
> >
> > public SimpleBinaryEvent(int id, int sequence, boolean status , long
> time) {
> >
> > this.id
> >  = id;
> >     this.sequence = sequence;
> >     this.status = status;
> >     this.time = time;
> > }
> > public int getId() {
> >     return id;
> > }
> > public int getSequence() {
> >     return sequence;
> > }
> > public boolean isStatus() {
> >     return status;
> > }
> > public long getTime() {
> >     return time;
> > }
> > @Override
> > public boolean equals(Object o) {
> >     if (this == o) return true;
> >     if (o == null || getClass() != o.getClass()) return false;
> >
> >     SimpleBinaryEvent that = (SimpleBinaryEvent) o;
> >
> >     if (getId() != that.getId()) return false;
> >     if (isStatus() != that.isStatus()) return false;
> >     if (getSequence() != that.getSequence()) return false;
> >     return getTime() == that.getTime();
> > }
> >
> > @Override
> > public int hashCode() {
> >     //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
> >     int result = getId();
> >     result = 31 * result + (isStatus() ? 1 : 0);
> >     result = 31 * result + getSequence();
> >     result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
> >     return result;
> > }
> >
> > @Override
> > public String toString() {
> >     return "SimpleBinaryEvent{" +
> >             "id='" + id + '\'' +
> >             ", status=" + status +
> >             ", sequence=" + sequence +
> >             ", time=" + time +
> >             '}';
> > }
> >
> > }
> >
> > failure cause:
> >
> > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
> operator KeyedCEPPatternOperator -> Map (1/1).
> > ... 6 more
> > Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException:
> Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1',
> status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....
> >
> > I am sure I have the equals() and hashCode() implemented the way it
> should be. I have tried the Objects.hashCode too. In other instances I have
> had CircularReference ( and thus stackOverflow ) on
> SharedBuffer.toString(), which again points to issues with references (
> equality and what not ). Without checkpointing turned on it works as
> expected. I am running on a local cluster. Is CEP production ready ?
> >
> > I am using 1.3.2 Flink
> >
>
>

Mime
View raw message