flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Missing MapState when Timer fires after restored state
Date Tue, 15 May 2018 06:22:44 GMT
Hi Juho,

As Sihua said, this shouldn't happen and indicates a bug. Did you only encounter this once
or can you easily reproduce the problem?


> On 15. May 2018, at 05:57, sihua zhou <summerleafs@163.com> wrote:
> Hi Juho,
> in fact, from your code I can't see any possible that the MapState could be inconsistency
with the timer, it's looks like a bug to me, because once the checkpoint's complete and you
haven't query the state in a customer thread async, then the result of the checkpoint should
be consistency. The only case, I can see where the timer could be inconsistency with state
is when the task is shutting down, that case the backend maybe already closed but the timer
failed to shutdown, so that the time callback function may access a closed backend. But it
shouldn't be reason of your case. Maybe, could you please provide us more information, like
what type of backend are you using? are you using the RocksDBBackend? and I think @Stefan
may tell more about this, and please correct me if I'm incorrect.
> Best,
> Sihua
> On 05/15/2018 01:48,Bowen Li<bowenli86@gmail.com> <mailto:bowenli86@gmail.com>
> Hi Juho,
> You are right, there's no transactional guarantee on timers and state in processElement().
They may end up with inconsistency if your job was cancelled in the middle of processing an
> To avoid the situation, the best programming practice is to always check if the state
you're trying to get is null or not.
> I've also created https://issues.apache.org/jira/browse/FLINK-9362 <https://issues.apache.org/jira/browse/FLINK-9362>
to document this. 
> Thanks
> Bowen
> On Mon, May 14, 2018 at 4:00 AM, Juho Autio <juho.autio@rovio.com <mailto:juho.autio@rovio.com>>
> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old state. After
restoring state from a checkpoint, it seems like a timer had been restored, but not the data
that was expected to be in a related MapState if such timer has been added.
> The way I see this is that there's a bug, either of these:
> - The writing of timers & map states to Flink state is not synchronized (or maybe
there are no such guarantees by design?)
> - Flink may restore a checkpoint that is actually corrupted/incomplete
> Our code (simplified):
>     private MapState<String, String> mapState;
>     public void processElement(..) {
>             mapState.put("lastUpdated", ctx.timestamp().toString());
>             ctx.timerService().registerEventTimeTimer(ctx.timestamp() + stateRetentionMillis);
>     }
>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>         long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>             mapState.clear();
>         }
>     }
> Normally this "just works". As you can see, it shouldn't be possible that "lastUpdated"
doesn't exist in state if timer was registered and onTimer gets called.
> However, after restoring state from a checkpoint, the job kept failing with this error:
> Caused by: java.lang.NumberFormatException: null
> at java.lang.Long.parseLong(Long.java:552)
> at java.lang.Long.parseLong(Long.java:631)
> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
> ..
> So apparently onTimer was called but lastUpdated wasn't found in the MapState.
> The background for restoring state in this case is not entirely clean. There was an OS
level issue "Too many open files" after running a job for ~11 days. To fix that, we replaced
the cluster with a new one and launched the Flink job again. State was successfully restored
from the latest checkpoint that had been created by the "problematic execution". Now, I'm
assuming that if the state wouldn't have been created successfully, restoring wouldn't succeed
either – correct? This is just to rule out that the issue with state didn't happen because
the checkpoint files were somehow corrupted due to the Too many open files problem.
> Thank you all for your continued support!
> P.S. I would be very much interested to hear if there's some cleaner way to achieve this
kind of TTL for keyed state in Flink.

View raw message