flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "wenlong.lwl" <wenlong88....@gmail.com>
Subject Re: Flink 状态使用问题咨询
Date Wed, 17 Apr 2019 03:24:04 GMT
可以封装一下state 的访问,从state get不到数据的时候,去数据库里取下,更新到state里

On Tue, 16 Apr 2019 at 20:53, zhang yue <zhangyue@silvrr.com> wrote:

> 是的,我希望从mysql加载初始的状态,因为我的kafka消息是从某个时间点开始的,在这个时间点之前的数据需要先加载到flink
state
> 那现在对于这种场景有什么替代方案吗
>
> > 在 2019年4月16日,下午8:33,Congxian Qiu <qcx978132955@gmail.com>
写道:
> >
> > Hi
> > 如果你希望程序在刚开始运行的时候从外部存储加载数据,这个暂时做不到,不过现在社区正在做
Savepoint Reader/Writer
> 相关的事情,到时候就可以了
> >
> > Best, Congxian
> > On Apr 16, 2019, 20:27 +0800, zhang yue <zhangyue@silvrr.com>, wrote:
> >> 你好,我有一个keyed
> state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。
> >>
> >>
> >> class LineItemStat extends RichFlatMapFunction<ObjectNode, JSONObject>
{
> >>
> >> /**
> >> * The ValueState handle. The first field is the count, the second field
> a running sum.
> >> */
> >> private transient MapState<String, Long> stat_value;
> >>
> >> @Override
> >> public void flatMap(ObjectNode input, Collector<JSONObject> out) throws
> Exception {
> >>
> >> // access the state value
> >>
> >> }
> >>
> >> @Override
> >> public void open(Configuration config) {
> >> MapStateDescriptor<String, Long> descriptor =
> >> new MapStateDescriptor<String, Long>(
> >> "stat_value",String.class, Long.class); // default value of the state,
> if nothing was set
> >> stat_value = getRuntimeContext().getMapState(descriptor);
> >> }
> >> }
> >>
>
>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message