flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dawid Wysakowicz <wysakowicz.da...@gmail.com>
Subject Re: Keyed CEP checkpoint fails
Date Thu, 10 Aug 2017 13:14:03 GMT
Yes, with the information I have, the conclusion would be the same, that I think the reason
is problem with hashcode. Without some data to reproduce it unfortunately I won’t be able
to help you further. I could just advise you to debug the method SharedBuffer#serialize and
pay attention to the entryID map.

> On 10 Aug 2017, at 14:54, Daiqing Li <lidaiqing1993@gmail.com> wrote:
> 
> Oh sorry, the data in {} is not empty because I hide private information about my model.
Do you have that same conclusion?
>> On Aug 10, 2017, at 8:52 AM, Dawid Wysakowicz <wysakowicz.dawid@gmail.com>
wrote:
>> 
>> You are right, I won’t be able to reproduce this problem without data. One thing
I can tell though that I think the problem is indeed with the hashcode. Unforunately I don’t
know Gson, but one strange thing I noticed is the exception message: SharedBufferEntry(ValueTimeWrapper({},
1502298303586, 0), [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, which
seems odd as if your event was empty.
>> 
>> Generally speaking as I understand this Exception is thrown because the hashcode
of your event changes during serialization, and access to some internal temporary cache is
broken.
>> 
>>> On 10 Aug 2017, at 14:29, Daiqing Li <lidaiqing1993@gmail.com> wrote:
>>> 
>>> Hi,
>>> 
>>> Here is the code. But I am not sure if you can reproduce the problem without
data source.
>>> 
>>> Best,
>>> Daiqing
>>> 
>>> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz <wysakowicz.dawid@gmail.com>
wrote:
>>> As @Kostas asked in your previous thread would be possible for you to share your
code for that job or at least a minimal example to reproduce this behaviour. I fear we won’t
be able to help you without any further info.
>>> 
>>> Regards,
>>> Dawid
>>> 
>>>> On 10 Aug 2017, at 14:10, Daiqing Li <lidaiqing1993@gmail.com> wrote:
>>>> 
>>>> Hi Flink user,
>>>> 
>>>> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this exception
after running for a while. Could anyone give me some help to debug this? I try parallelism
1, and it has the same problem. I also try reimplemented hashcode and equals method. I use
UUID as hashcode right now.
>>>> 2017-08-09 18:15:04,572 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
       - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (d4749a4c3469732a2a5edf40b83f88
>>>> d4) switched from RUNNING to FAILED.
>>>> AsynchronousException{java.
>>>> lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator
-> Map -> Sink: Unnamed (3/4).}
>>>>     at org.apache.flink.streaming.
>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>>> StreamTask.java:970)
>>>>     at java.util.concurrent.
>>>> Executors$RunnableAdapter.
>>>> call(Executors.java:511)
>>>>     at java.util.concurrent.
>>>> FutureTask.run(FutureTask.
>>>> java:266)
>>>>     at java.util.concurrent.
>>>> ThreadPoolExecutor.runWorker(
>>>> ThreadPoolExecutor.java:1149)
>>>>     at java.util.concurrent.
>>>> ThreadPoolExecutor$Worker.run(
>>>> ThreadPoolExecutor.java:624)
>>>>     at java.lang.Thread.run(Thread.
>>>> java:748)
>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 946 for
operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
>>>>     ... 6 more
>>>> Caused by: java.util.concurrent.
>>>> ExecutionException: java.lang.IllegalStateException: Could not find id for
entry: SharedBufferEntry(
>>>> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
>>>>     at java.util.concurrent.
>>>> FutureTask.report(FutureTask.
>>>> java:122)
>>>>     at java.util.concurrent.
>>>> FutureTask.get(FutureTask.
>>>> java:192)
>>>>     at org.apache.flink.util.
>>>> FutureUtil.runIfNotDoneAndGet(
>>>> FutureUtil.java:43)
>>>>     at org.apache.flink.streaming.
>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>>> StreamTask.java:897)
>>>>     ... 5 more
>>>>     Suppressed: java.lang.Exception: Could not properly cancel managed keyed
state future.
>>>>             at org.apache.flink.streaming.
>>>> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
>>>> 90)
>>>>             at org.apache.flink.streaming.
>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.
>>>> cleanup(StreamTask.java:1023)
>>>>             at org.apache.flink.streaming.
>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>>> StreamTask.java:961)
>>>>             ... 5 more
>>>> 
>>> 
>>> 
>>> <MilestoneEvent.java><example.java>
>> 
> 


Mime
View raw message