flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jin Yi <eleanore....@gmail.com>
Subject Question regarding checkpoint/savepoint and State Processor API
Date Tue, 21 Jan 2020 00:07:28 GMT
Hi there,

1. in my job, I have a broadcast stream, initially there is no savepoint
can be used as bootstrap values for the broadcast stream states.
  BootstrapTransformation transform =

Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
    .withOperator(OPERATOR_UID, transform)

Question: bootstrapWith(dataSet) is required, normally, the dataSet comes
from the old savepoint, in this case, I dont have one, how should I deal
with it? Or it is must required?

2. As messages coming through broadcast stream, the state gets updated

3. I would like to periodically save the broadcast state to a file via
Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
.withOperator(OPERATOR_UID, transform)

4. when the job gets cancelled, and next time when re-start the job, the
broadcast initial state can be loaded from the previous savepoint.

ExistingSavepoint existingSavepoint = Savepoint.load(environment,
new MemoryStateBackend());

dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID,

Question: now assume I got the old state as dataSet, how can I use it
in the BroadcastProcessFunction as the initial state of the broadcast

Thanks a lot for the help!


  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message