flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "sihua zhou" <summerle...@163.com>
Subject Re: Missing MapState when Timer fires after restored state
Date Wed, 16 May 2018 09:30:23 GMT
Hi, Juho

> If restoring + rescaling a checkpoint is not supported properly, I don't understand why
Flink doesn't entirely refuse to restore in that case?

I think you're asking the question I have asked in https://github.com/apache/flink/pull/5490,
you can refer to it and find the comments there.

@Stefan, PR(https://github.com/apache/flink/pull/6020) has been prepared.

Best, Sihua

On 05/16/2018 17:20,Juho Autio<juho.autio@rovio.com> wrote:
Yes, I'm rescaling from a checkpoint.

> that behavior is not guaranteed yet

If restoring + rescaling a checkpoint is not supported properly, I don't understand why Flink
doesn't entirely refuse to restore in that case?

Note that I'm using a rather new 1.5-SNAPSHOT, not release 1.4. To be exact, the package was
built at flink commit 8395508b0401353ed07375e22882e7581d46ac0e.

I also made this additional test:
- launch again from the checkpoint with the original parallelism=8
- cancel with savepoint (this was after ~7 minutes, during which the job also created some
new incremental checkpoints
- restore the newly created savepoint with parallelism=16
- it works

Now I have restored from the original checkpoint multiple times and it consistently fails
with that java.lang.NumberFormatException when trying with parallelism=18.

I was able to enable trace-level logs with this change (nothing changed in logback.xml):

Added these lines to flink-1.5-SNAPSHOT/conf/log4j.properties:

# To debug checkpoint restoring

(This is confusing to me – I had understood that Flink would have migrated entirely to use
logback configuration instead of log4j?)

I modified the logs to remove sensitive information, but because I'm not 100% sure that I
caught everything, I will share the logs personally with Stefan only.

On Wed, May 16, 2018 at 5:22 AM, sihua zhou <summerleafs@163.com> wrote:

Hi Juho,
if I'm not misunderstand, you saied your're rescaling the job from the checkpoint? If yes,
I think that behavior is not guaranteed yet, you can find this on the doc https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html#difference-to-savepoints.
So, I not sure whether this is a "bug" at current stage(personally I'd like to dig it out
because currently we also use the checkpoint like the way you are) ...

Best, Sihua

On 05/16/2018 01:46,Juho Autio<juho.autio@rovio.com> wrote:
I was able to reproduce this error.

I just happened to notice an important detail about the original failure:

- checkpoint was created with a 1-node cluster (parallelism=8)
- restored on a 2-node cluster (parallelism=16), caused that null exception

I tried restoring again from the problematic checkpoint again

- restored on a 1-node cluster, no problems
- restored on a 2-node cluster, getting the original error!

So now I have a way to reproduce the bug. To me it seems like the checkpoint itself is fine.
The bug seems to be in redistributing the state of a restored checkpoint to a higher parallelism.
I only tested each cluster size once (as described above) so it could also be coincidence,
but seems at least likely now that it's about the state redistribution.

I'll try to follow up with those TRACE-level logs tomorrow. Today I tried adding these to
the logback.xml, but I didn't get anything else but INFO level logs:

    <logger name="org.apache.flink.contrib.streaming.state" level="TRACE">
        <appender-ref ref="file"/>
    <logger name="org.apache.flink.runtime.state" level="TRACE">
        <appender-ref ref="file"/>
    <logger name="org.apache.flink.runtime.checkpoint" level="TRACE">
        <appender-ref ref="file"/>
    <logger name="org.apache.flink.streaming.api.operators" level="TRACE">
        <appender-ref ref="file"/>
    <logger name="org.apache.flink.streaming.runtime.tasks" level="TRACE">
        <appender-ref ref="file"/>

Maybe I need to edit the log4j.properties instead(?). Indeed it's Flink 1.5-SNAPSHOT and the
package has all of these in the conf/ dir:


On Tue, May 15, 2018 at 11:49 AM, Stefan Richter <s.richter@data-artisans.com> wrote:


Am 15.05.2018 um 10:34 schrieb Juho Autio <juho.autio@rovio.com>:

Ok, that should be possible to provide. Are there any specific packages to set on trace level?
Maybe just go with org.apache.flink.* on TRACE?

The following packages would be helpful:


> did the „too many open files“ problem only happen with local recovery (asking since
it should actually not add the the amount of open files)

I think it happened in various places, maybe not when restoring.. Any way if the situation
is like that, the system is pretty much unusable (on OS level), so it shouldn't matter too
much which operation of the application it causes to fail? Any way I'll try to grab &
share all log lines that say "Too Many Open Files"..

> and did you deactivate it on the second cluster for the restart or changed your OS settings?

No, didn't change anything except for increasing the ulimit on OS to prevent this from happening
again. Note that the system only ran out of files after ~11 days of uptime. During that time
there had been some local recoveries. This makes me wonder though, could it be that many local
recoveries eventually caused this – could it be that in the occasion of local recovery some
"old" files are left open, making the system eventually run out of files?

From the way how local recovery works with incremental RocksDB checkpoints, I would not assume
that it is the cause of the problem. In this particular case, the number of opened files on
a local FS should not be higher than the number without local recovery. Maybe it is just a
matter of the OS limit and the number of operators with a RocksDB backend running on the machine
and the amount of files managed by all those RocksDB instances that simply exceed the limit.
If you have an overview how many parallel operator instances with keyed state were running
on the machine and assume some reasonable number of files per RocksDB instance and the limit
configured in your OS, could that be the case?


Thanks for your help!

On Tue, May 15, 2018 at 11:17 AM, Stefan Richter <s.richter@data-artisans.com> wrote:

Btw having a trace level log of a restart from a problematic checkpoint could actually be
helpful if we cannot find the problem from the previous points. This can give a more detailed
view of what checkpoint files are mapped to which operator.

I am having one more question: did the „too many open files“ problem only happen with
local recovery (asking since it should actually not add the the amount of open files), and
did you deactivate it on the second cluster for the restart or changed your OS settings?

Am 15.05.2018 um 10:09 schrieb Stefan Richter <s.richter@data-artisans.com>:

What I would like to see from the logs is (also depending a bit on your log level):

- all exceptions.
- in which context exactly the „too many open files“ problem occurred, because I think
for checkpoint consistency it should not matter as a checkpoint with such a problem should
never succeed.
- files that are written for checkpoints/savepoints.
- completed checkpoints/savepoints ids.
- the restored checkpoint/savepoint id.
- files that are loaded on restore.

Am 15.05.2018 um 10:02 schrieb Juho Autio <juho.autio@rovio.com>:

Thanks all. I'll have to see about sharing the logs & configuration..

Is there something special that you'd like to see from the logs? It may be easier for me to
get specific lines and obfuscate sensitive information instead of trying to do that for the
full logs.

We basically have: RocksDBStateBackend with enableIncrementalCheckpointing=true, external
state path on s3.

The code that we use is:

            env.setStateBackend(getStateBackend(statePath, new RocksDBStateBackend(statePath,
60 * 1000));
10 * 60 * 1000));

The problematic state that we tried to use was a checkpoint created with this conf.

> Are you using the local recovery feature?

Yes, and in this particular case the job was constantly failing/restarting because of Too
Many Open Files. So we terminated the cluster entirely, created a new one, and launched a
new job by specifying the latest checkpoint path to restore state from.

This is the only time I have seen this error happen with timer state. I still have that bad
checkpoint data on s3, so I might be able to try to restore it again if needed to debug it.
But that would require some tweaking, because I don't want to tangle with the same kafka consumer
group offsets or send old data again to production endpoint.

Please keep in mind that there was that Too Many Open Files issue on the cluster that created
the problematic checkpoint, if you think that's relevant.

On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <s.richter@data-artisans.com> wrote:


I agree, this looks like a bug. Can you tell us your exact configuration of the state backend,
e.g. if you are using incremental checkpoints or not. Are you using the local recovery feature?
Are you restarting the job from a checkpoint or a savepoint? Can you provide logs for both
the job that failed and the restarted job?


Am 14.05.2018 um 13:00 schrieb Juho Autio <juho.autio@rovio.com>:

We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old state. After restoring
state from a checkpoint, it seems like a timer had been restored, but not the data that was
expected to be in a related MapState if such timer has been added.

The way I see this is that there's a bug, either of these:
- The writing of timers & map states to Flink state is not synchronized (or maybe there
are no such guarantees by design?)
- Flink may restore a checkpoint that is actually corrupted/incomplete

Our code (simplified):

    private MapState<String, String> mapState;

    public void processElement(..) {
            mapState.put("lastUpdated", ctx.timestamp().toString());
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + stateRetentionMillis);

    public void onTimer(long timestamp, OnTimerContext ctx, ..) {

        long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
        if (timestamp >= lastUpdated + stateRetentionMillis) {

Normally this "just works". As you can see, it shouldn't be possible that "lastUpdated" doesn't
exist in state if timer was registered and onTimer gets called.

However, after restoring state from a checkpoint, the job kept failing with this error:

Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)

So apparently onTimer was called but lastUpdated wasn't found in the MapState.

The background for restoring state in this case is not entirely clean. There was an OS level
issue "Too many open files" after running a job for ~11 days. To fix that, we replaced the
cluster with a new one and launched the Flink job again. State was successfully restored from
the latest checkpoint that had been created by the "problematic execution". Now, I'm assuming
that if the state wouldn't have been created successfully, restoring wouldn't succeed either
– correct? This is just to rule out that the issue with state didn't happen because the
checkpoint files were somehow corrupted due to the Too many open files problem.

Thank you all for your continued support!

P.S. I would be very much interested to hear if there's some cleaner way to achieve this kind
of TTL for keyed state in Flink.

View raw message