flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: CEP issue in 1.3.2. Does 1.4 fix this ?
Date Tue, 23 Jan 2018 09:09:39 GMT
Hi Vishal,

Thanks for checking and glad to hear that your job works after the fix!

As for the equals/hashcode question, if your question is if you have to implement exact equals()
method and the corresponding hashcode()
then the answer is yes. These methods are used when retrieving and cleaning up “outdated”
data from FlinkCEP’s internal datastructures.
As a consequence, ambiguous implementations can lead to the wrong elements being cleaned up.

Thanks, 
Kostas

> On Jan 21, 2018, at 3:32 PM, Vishal Santoshi <vishal.santoshi@gmail.com> wrote:
> 
> Have tested against the 1.5 SNAPShot ( I simply pulled  the source code into my distribution
and compiled it into my job jar ). Both the test code and the cluster seems to work ok. Have
not tested the "savepoint  and resume" mode but restore from checkpoint works. I brought the
JM down and restarted it.  I have to sanitize the output but at least the exception is not
thrown.
> 
> One thing though and please confirm
> 
> In CEP it seems that a POJO pushed into the window as part of Pattern match has to have
an  "exact" equals/hashcode.  As in in my case I had a custom equals/hashcode for enabling
"contains" for a different context as in I had deliberately not included an instance variable
in the equals/hashcode  contract. Is that a design decision or a requirement in CEP ? 
> 
> Thanks and Regards.
> 
> 
> 
> 
> On Sun, Jan 14, 2018 at 12:27 PM, Vishal Santoshi <vishal.santoshi@gmail.com <mailto:vishal.santoshi@gmail.com>>
wrote:
> Will do.
> 
> On Sun, Jan 14, 2018 at 10:23 AM, Fabian Hueske <fhueske@gmail.com <mailto:fhueske@gmail.com>>
wrote:
> We don't have a schedule for bugfix releases but do them based on need.
> AFAIK, a discussion about a 1.4.1 release has not been started yet.
> 
> Would you like to kick that off by sending a mail to the dev mailing list?
> 
> 
> 2018-01-12 16:41 GMT+01:00 Vishal Santoshi <vishal.santoshi@gmail.com <mailto:vishal.santoshi@gmail.com>>:
> Thanks.  We will.
> 
>    When is 1.4.1 scheduled for release ? 
> 
> On Fri, Jan 12, 2018 at 3:24 AM, Dawid Wysakowicz <wysakowicz.dawid@gmail.com <mailto:wysakowicz.dawid@gmail.com>>
wrote:
> Hi Vishal,
> I think it might be due to this bug: https://issues.apache.org/jira/browse/FLINK-8226
<https://issues.apache.org/jira/browse/FLINK-8226>
> It was merged for 1.4.1 and 1.5.0. Could you check with this changes applied? Would be
really helpful. If the error still persists could you file a jira?
> 
> Regards
> Dawid
> 
> > On 11 Jan 2018, at 19:49, Vishal Santoshi <vishal.santoshi@gmail.com <mailto:vishal.santoshi@gmail.com>>
wrote:
> >
> > When checkpointing is turned on a simple CEP loop pattern
> >
> >  private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern
= Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn)
> >         .followedBy("middle").where(checkStatusOn).times(2)
> >         .next("end").where(checkStatusOn).within(Time.minutes(5))
> >
> > I see failures.
> >
> > SimpleBinaryEvent is
> >
> > public class SimpleBinaryEvent implements Serializable {
> >
> > private int id;
> > private int sequence;
> > private boolean status;
> > private long time;
> >
> > public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
> >
> > this.id <http://this.id/>
> >  = id;
> >     this.sequence = sequence;
> >     this.status = status;
> >     this.time = time;
> > }
> > public int getId() {
> >     return id;
> > }
> > public int getSequence() {
> >     return sequence;
> > }
> > public boolean isStatus() {
> >     return status;
> > }
> > public long getTime() {
> >     return time;
> > }
> > @Override
> > public boolean equals(Object o) {
> >     if (this == o) return true;
> >     if (o == null || getClass() != o.getClass()) return false;
> >
> >     SimpleBinaryEvent that = (SimpleBinaryEvent) o;
> >
> >     if (getId() != that.getId()) return false;
> >     if (isStatus() != that.isStatus()) return false;
> >     if (getSequence() != that.getSequence()) return false;
> >     return getTime() == that.getTime();
> > }
> >
> > @Override
> > public int hashCode() {
> >     //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
> >     int result = getId();
> >     result = 31 * result + (isStatus() ? 1 : 0);
> >     result = 31 * result + getSequence();
> >     result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
> >     return result;
> > }
> >
> > @Override
> > public String toString() {
> >     return "SimpleBinaryEvent{" +
> >             "id='" + id + '\'' +
> >             ", status=" + status +
> >             ", sequence=" + sequence +
> >             ", time=" + time +
> >             '}';
> > }
> >
> > }
> >
> > failure cause:
> >
> > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator
KeyedCEPPatternOperator -> Map (1/1).
> > ... 6 more
> > Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException:
Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1',
status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....
> >
> > I am sure I have the equals() and hashCode() implemented the way it should be. I
have tried the Objects.hashCode too. In other instances I have had CircularReference ( and
thus stackOverflow ) on SharedBuffer.toString(), which again points to issues with references
( equality and what not ). Without checkpointing turned on it works as expected. I am running
on a local cluster. Is CEP production ready ?
> >
> > I am using 1.3.2 Flink
> >
> 
> 
> 
> 
> 


Mime
View raw message