flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yun Tang <myas...@live.com>
Subject Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?
Date Wed, 08 Apr 2020 03:07:15 GMT
HI Salva

Sorry for missing your recent reply.
If you just want to make the models could be recoverable, you should choose operator state
to store the "models". If you stick to the keyed state, I cannot see why these models are
related to current processing key. As you can see, the "models" is just a HashMap[(String,
String), Model], and I don't know why we need to couple all models to just one specific key.

Best
Yun Tang
________________________________
From: Salva Alcántara <salcantaraphd@gmail.com>
Sent: Sunday, April 5, 2020 20:22
To: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction,
valid or not?

Hi Yun,

In the end, I left the code like this

```
override def snapshotState(context: FunctionSnapshotContext): Unit = {
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))
    }
}
```

I have verified with a simple test that most of the times checkpoints seem
to work fine. However, from time to time, the map state is not saved
properly (getting an empty map state). So, looks like updating the keyed
state like that within the `snapshotState` method is conceptually wrong,
indeed this method does not receive any keyed context to start with. Because
of this, I think the user should not even be allowed to invoke `put` (`nor`
clear) on the map state object. That would help making things less
confusing.

The reason why I am trying to serialize my (keyed state) models inside
`snaphsotState` is because these models are self-evolving and possess their
own (time-varying) state, otherwise I could just serialize them once after
creation on `processElement1` method. So, given this situation, how could I
handle my use case? Ideally, I should only serialize them when checkpoints
are taken, in particular I want to avoid having to serialize them after
every element received in `processElement2` (the state of my models change
with each new element processed here).  Maybe I cannot achieve my goals with
keyed state and need operator state instead.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Mime
View raw message