flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Salva Alcántara <salcantara...@gmail.com>
Subject On efficient checkpoints with dynamic (self-evolving) keyed state
Date Mon, 06 Apr 2020 06:30:00 GMT
In a KeyedCoProcessFunction, I am managing a keyed state which consists of
third-party library models. These models are created on reception of new
data on the control stream within `processElement1`. Because the models are
self-evolving, in the sense that have their own internal state, I need to
make sure that they are serialized in `modelsBytes` when their state
changes. My first attempt goes like this:

class MyOperator
  extends KeyedCoProcessFunction[String, Control, Data, Prediction]
    with CheckpointedFunction {

  // To hold loaded models
  @transient private var models: HashMap[String, Model] = _

  // For serialization purposes
  @transient private var modelsBytes: MapState[String, Array[Bytes]] = _

  override def processElement1(control, ctx, ...) {
    if (restoreModels) {
    // - Create new model out of `control` element
    // - Add it to `models` keyed state

  override def processElement2(data, ctx, ...) {
    if (restoreModels) {

    // - Send `data` element to the corresponding models
    // This will update their internal states

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    // Suspicious, wishful-thinking code that compiles and runs just fine
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))

  override def initializeState(context: FunctionInitializationContext): Unit
= {
    modelsBytes = context.getKeyedStateStore.getMapState[String](
      new MapStateDescriptor("modelsBytes", classOf[String])

    if (context.isRestored) restoreModels = true


So, the idea is to use `snapshotState` to override the *keyed* state entries
in `modelsBytes`. The reason why I am trying this approach is because
serializing the models (`model.toBytes`) might be an expensive operation.
Therefore, I would prefer to do it once per model when a checkpoint comes.
The problem with this approach is that it might be inherently/conceptually
wrong. Here is why, even if the code within `snapshotState` compiles and
runs just fine, note that I am referring to a keyed state piece without
getting a `keyed` context passed in, so it is not clear at all what key I am
really working on to start with. I have written a small test to verify the
checkpoints, and I have observed that from time to time I get an empty state
back, even if the modelsBytes state entries were updated in `snapshotState`.
So it seems that snapshotting my models like this is not reliable at all.
What confuses me is that the user is perfectly allowed to do this, maybe the
`put` method should raise an exception to make it clear that a keyed state
is required in the first place, otherwise it gives false hope and might lead
to hard-to-spot bugs. As a matter of fact, shouldn't this be considered a

The other option I have is, of course, to serialize my models in
`processElement2`, after sending new data elements to them. However,
continuously serialzing my models to update `modelsBytes` might be costly.

What would be the most efficient way to handle this scenario?

Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

View raw message