flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yun Tang <myas...@live.com>
Subject Re: [State Processor API] how to convert savepoint back to broadcast state
Date Mon, 27 Jan 2020 09:53:59 GMT
Hi Yi

Glad to know you have already resolved it. State process API would use data stream API instead
of data set API in the future [1].

Besides, you could also follow the guide in "the brodcast state pattern"[2]


// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> stateDescriptor = new MapStateDescriptor<>(
                        "RulesBroadcastState",
                        BasicTypeInfo.STRING_TYPE_INFO,
                        TypeInformation.of(new TypeHint<Rule>() {}));

// broadcast the rules and create the broadcast state
BroadcastStream<Rule> broadcastStream = ruleStream
                        .broadcast(stateDescriptor);


colorPartitionedStream
                 .connect(broadcastStream)
                 .process(

                     new KeyedBroadcastProcessFunction<Color, Item, Rule, String>()
{
                         // my matching logic
                     }
                 ).uid("your-uid");

Make sure the uid and the state-name are the same with those in your savepoint, the CoBroadcastWithKeyedOperator
would initialize the broadcast state when opening. [3]


[1] https://flink.apache.org/feature/2019/09/13/state-processor-api.html#why-dataset-api
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html#provided-apis
[3] https://github.com/apache/flink/blob/53f956fb57dd5601d2e3ca9289207d21796cdc4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java#L101


Best
Yun Tang

________________________________
From: Jin Yi <eleanore.jin@gmail.com>
Sent: Monday, January 27, 2020 14:50
To: Yun Tang <myasuka@live.com>
Cc: user <user@flink.apache.org>; user-zh@flink.apache.org <user-zh@flink.apache.org>
Subject: Re: [State Processor API] how to convert savepoint back to broadcast state

Hi Yun,

After search around in the documentation, I tried extends BroadcastProcessFunction implements
CheckpointedFunction. And I have initialized broadcast state in public void initializeState(FunctionInitializationContext
context) method, it seems working fine.

Here is the doc I followed: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction

Thanks a lot for your help!
Eleanore

On Sun, Jan 26, 2020 at 6:53 PM Jin Yi <eleanore.jin@gmail.com<mailto:eleanore.jin@gmail.com>>
wrote:
Hi Yun,

Thanks for the response, I have checked official document, and I have referred this example
to write the broadcast state to a savepoint.

My question is: I can use state processor api to read back the savepoint into a dataSet, but
how can I use the dataSet as the initial value for the broadcast state in the BroadcastProcessFunction.

Thanks a lot!

Eleanore

On Sun, Jan 26, 2020 at 8:53 AM Yun Tang <myasuka@live.com<mailto:myasuka@live.com>>
wrote:
Hi Yi

Can the official doc of writing broad cast state [1] satisfies your request?

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1

Best
Yun Tang
________________________________
From: Jin Yi <eleanore.jin@gmail.com<mailto:eleanore.jin@gmail.com>>
Sent: Thursday, January 23, 2020 8:12
To: user <user@flink.apache.org<mailto:user@flink.apache.org>>; user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>
<user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>>
Subject: [State Processor API] how to convert savepoint back to broadcast state

Hi there,

I would like to read the savepoints (for broadcast state) back into the broadcast state, how
should I do it?


// load the existingSavepoint;
ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints",
new MemoryStateBackend());

// read state from existing savepoint
dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, "largeKeySetStateDescription",
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

// TODO in BoradcastProcessFunction, how can I put the savepoint dataset back into BroadcastState?

Thanks!

Eleanore

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message