flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daiqing Li <lidaiqing1...@gmail.com>
Subject Re: Keyed CEP checkpoint fails
Date Fri, 11 Aug 2017 00:40:41 GMT
Hi Dawid,

After rewriting dashcode with Objects.hash for all the fields, I still get the same error.
One thing special is checkpoints always fail at 428, after trying many times. Does it mean
anything?
> On Aug 10, 2017, at 9:14 AM, Dawid Wysakowicz <wysakowicz.dawid@gmail.com> wrote:
> 
> Yes, with the information I have, the conclusion would be the same, that I think the
reason is problem with hashcode. Without some data to reproduce it unfortunately I won’t
be able to help you further. I could just advise you to debug the method SharedBuffer#serialize
and pay attention to the entryID map.
> 
>> On 10 Aug 2017, at 14:54, Daiqing Li <lidaiqing1993@gmail.com> wrote:
>> 
>> Oh sorry, the data in {} is not empty because I hide private information about my
model. Do you have that same conclusion?
>>> On Aug 10, 2017, at 8:52 AM, Dawid Wysakowicz <wysakowicz.dawid@gmail.com>
wrote:
>>> 
>>> You are right, I won’t be able to reproduce this problem without data. One
thing I can tell though that I think the problem is indeed with the hashcode. Unforunately
I don’t know Gson, but one strange thing I noticed is the exception message: SharedBufferEntry(ValueTimeWrapper({},
1502298303586, 0), [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, which
seems odd as if your event was empty.
>>> 
>>> Generally speaking as I understand this Exception is thrown because the hashcode
of your event changes during serialization, and access to some internal temporary cache is
broken.
>>> 
>>>> On 10 Aug 2017, at 14:29, Daiqing Li <lidaiqing1993@gmail.com> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> Here is the code. But I am not sure if you can reproduce the problem without
data source.
>>>> 
>>>> Best,
>>>> Daiqing
>>>> 
>>>> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz <wysakowicz.dawid@gmail.com>
wrote:
>>>> As @Kostas asked in your previous thread would be possible for you to share
your code for that job or at least a minimal example to reproduce this behaviour. I fear we
won’t be able to help you without any further info.
>>>> 
>>>> Regards,
>>>> Dawid
>>>> 
>>>>> On 10 Aug 2017, at 14:10, Daiqing Li <lidaiqing1993@gmail.com>
wrote:
>>>>> 
>>>>> Hi Flink user,
>>>>> 
>>>>> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this
exception after running for a while. Could anyone give me some help to debug this? I try parallelism
1, and it has the same problem. I also try reimplemented hashcode and equals method. I use
UUID as hashcode right now.
>>>>> 2017-08-09 18:15:04,572 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
       - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (d4749a4c3469732a2a5edf40b83f88
>>>>> d4) switched from RUNNING to FAILED.
>>>>> AsynchronousException{java.
>>>>> lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator
-> Map -> Sink: Unnamed (3/4).}
>>>>>    at org.apache.flink.streaming.
>>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>>>> StreamTask.java:970)
>>>>>    at java.util.concurrent.
>>>>> Executors$RunnableAdapter.
>>>>> call(Executors.java:511)
>>>>>    at java.util.concurrent.
>>>>> FutureTask.run(FutureTask.
>>>>> java:266)
>>>>>    at java.util.concurrent.
>>>>> ThreadPoolExecutor.runWorker(
>>>>> ThreadPoolExecutor.java:1149)
>>>>>    at java.util.concurrent.
>>>>> ThreadPoolExecutor$Worker.run(
>>>>> ThreadPoolExecutor.java:624)
>>>>>    at java.lang.Thread.run(Thread.
>>>>> java:748)
>>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 946
for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
>>>>>    ... 6 more
>>>>> Caused by: java.util.concurrent.
>>>>> ExecutionException: java.lang.IllegalStateException: Could not find id
for entry: SharedBufferEntry(
>>>>> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)],
1)
>>>>>    at java.util.concurrent.
>>>>> FutureTask.report(FutureTask.
>>>>> java:122)
>>>>>    at java.util.concurrent.
>>>>> FutureTask.get(FutureTask.
>>>>> java:192)
>>>>>    at org.apache.flink.util.
>>>>> FutureUtil.runIfNotDoneAndGet(
>>>>> FutureUtil.java:43)
>>>>>    at org.apache.flink.streaming.
>>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>>>> StreamTask.java:897)
>>>>>    ... 5 more
>>>>>    Suppressed: java.lang.Exception: Could not properly cancel managed
keyed state future.
>>>>>            at org.apache.flink.streaming.
>>>>> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
>>>>> 90)
>>>>>            at org.apache.flink.streaming.
>>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.
>>>>> cleanup(StreamTask.java:1023)
>>>>>            at org.apache.flink.streaming.
>>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>>>> StreamTask.java:961)
>>>>>            ... 5 more
>>>>> 
>>>> 
>>>> 
>>>> <MilestoneEvent.java><example.java>
>>> 
>> 
> 


Mime
View raw message