flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang yue <zhang...@silvrr.com>
Subject Re: Flink 状态使用问题咨询
Date Tue, 16 Apr 2019 12:44:26 GMT
Savepoint Reader/Writer 首先将外部数据写入savepoint,然后从savepoint启动吗

> 在 2019年4月16日,下午8:33,Congxian Qiu <qcx978132955@gmail.com> 写道:
> 
> Hi
> 如果你希望程序在刚开始运行的时候从外部存储加载数据,这个暂时做不到,不过现在社区正在做
Savepoint Reader/Writer 相关的事情,到时候就可以了
> 
> Best, Congxian
> On Apr 16, 2019, 20:27 +0800, zhang yue <zhangyue@silvrr.com>, wrote:
>> 你好,我有一个keyed state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。
>> 
>> 
>> class LineItemStat extends RichFlatMapFunction<ObjectNode, JSONObject> {
>> 
>> /**
>> * The ValueState handle. The first field is the count, the second field a running
sum.
>> */
>> private transient MapState<String, Long> stat_value;
>> 
>> @Override
>> public void flatMap(ObjectNode input, Collector<JSONObject> out) throws Exception
{
>> 
>> // access the state value
>> 
>> }
>> 
>> @Override
>> public void open(Configuration config) {
>> MapStateDescriptor<String, Long> descriptor =
>> new MapStateDescriptor<String, Long>(
>> "stat_value",String.class, Long.class); // default value of the state, if nothing
was set
>> stat_value = getRuntimeContext().getMapState(descriptor);
>> }
>> }
>> 


Mime
View raw message