flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yun Tang <myas...@live.com>
Subject Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?
Date Mon, 02 Dec 2019 03:30:39 GMT
Hi Salva

The root cause is that you did not figure out the difference between keyed state and operator

There is no ‘currentKey’ in operator state, which means PartitionableListState#clear()
will clear the whole state. However, there is always a ‘currentKey’ in keyed state, which
means ‘state#clear()’ would only remove the entry scoped to current runtime key. In your
example code, the state to clear is a MapState (not a list state) and therefore must be a
keyed state. If your job did not process any record, there would no ‘currentKey’ to be
set [1] for that ‘modelsBytes’ state which lead to the NPE when calling ‘state#clear()’.

Moreover, ‘snapshotState’ and ‘initializeState’ interfaces are used mainly to snapshot
and initialize for operator state.

Last but not least, even you could ensure at least one record processed before calling ‘snapshotState’,
it’s not clear for your program logic. You cannot control well which entry in you state
would be cleared since you cannot control the current key which set via processing record.

You could refer to TwoPhaseCommitSinkFunction [2] to figure out what state could be cleared
during snapshotStaet.

[1] https://github.com/apache/flink/blob/8c6cc4505a4c27daadb00cd94df8a7e955eb8d52/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java#L136
[2] https://github.com/apache/flink/blob/8c6cc4505a4c27daadb00cd94df8a7e955eb8d52/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L324

Yun Tang

From: Congxian Qiu <qcx978132955@gmail.com>
Date: Monday, December 2, 2019 at 10:41 AM
To: Salva Alcántara <salcantaraphd@gmail.com>
Cc: user <user@flink.apache.org>
Subject: Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction,
valid or not?


From the exception `No key set. This method should not be called outside of a keyed context.`
it means that the key current passed in is null. In my opinion, it's something wrong here
if there will throw an exception when no data arrive. could you please share the whole stack
and a minimal reproducible job for this issue?


Salva Alcántara <salcantaraphd@gmail.com<mailto:salcantaraphd@gmail.com>> 于2019年12月1日周日

class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction]
  with CheckpointedFunction {

  // To hold loaded models
  @transient private var models: HashMap[(String, String), Model] = _

  // For serialization purposes
  @transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _


  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    modelsBytes.clear() // This raises an exception when there is no active key set
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))

  override def initializeState(context: FunctionInitializationContext): Unit = {
    modelsBytes = context.getKeyedStateStore.getMapState[String, String](
      new MapStateDescriptor("modelsBytes", classOf[String], classOf[String])

    if (context.isRestored) {
      // restore models from modelsBytes


It happens that `modelsBytes.clear()` raises an exception when there is no active key. This
happens when I start the application from scratch without any data on the input streams. So,
when the time for a checkpoint comes, I get this error:

`java.lang.NullPointerException: No key set. This method should not be called outside of a
keyed context.`

However, when the input stream contains data, checkpoints work just fine. I am a bit confused
about this because `snapshotState` does not provide a keyed context (contrary to `processElement1`
and `processElement2`, where the current key is accessible by doing `ctx.getCurrentKey`) so
it seems to me that the calls to `clear` and `put` within `snapshotState` should fail always
since they're supposed to work only within a keyed context. Can anyone clarify if this is
the expected behaviour actually?
View raw message