flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 戴嘉诚 <a773807...@gmail.com>
Subject Re: Re: Flink checkpoint 并发问题
Date Thu, 25 Jul 2019 11:26:06 GMT
hi
    你好,我用的flink是1.8,但是是根据hadoop 2.7.3.2.6.5.0-292  自行打包的,
 operator state
descriptor是使用MapStateDescriptor<String, Long>,
谢谢!

Yun Tang <myasuka@live.com> 于2019年7月25日周四 下午7:10写道:

> Hi  all
>
> 你们讨论的已经越来越偏了,出问题的是operator state
> backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。
>
> To 戴嘉诚
> 你使用的Flink版本是什么?这个operator 里面的operator state descriptor是什么?
>
> 祝好
> 唐云
> ________________________________
> From: 戴嘉诚 <a773807943@gmail.com>
> Sent: Thursday, July 25, 2019 19:04
> To: user-zh@flink.apache.org <user-zh@flink.apache.org>
> Subject: Re: Re: Flink checkpoint 并发问题
>
> 这个用window不符合这个场景,因为是取每时每刻的最近半小时,而window只能是固定时间区间,例如window只能统计6点半至7点,
> 7点之7点半,而我的场景是需要6点到6点半,6点02分到6点32分,是根据数据的时间来确定区间,达到统计区间内的总数
>
> athlon512@gmail.com <athlon512@gmail.com>于2019年7月25日 周四18:50写道:
>
> > 那你用window和evictor 不可以吗?
> > 类似这样,因为我理解你的业务需求可以用这个来实现
> > 在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
> >
> > ------------------------------
> > athlon512@gmail.com
> >
> >
> > *发件人:* 戴嘉诚 <a773807943@gmail.com>
> > *发送时间:* 2019-07-25 18:45
> > *收件人:* user-zh <user-zh@flink.apache.org>
> > *主题:* Re: Re: Flink checkpoint 并发问题
> >
> >
> >
> 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v
> > 对
> >
> > athlon512@gmail.com <athlon512@gmail.com> 于2019年7月25日周四 下午6:40写道:
> >
> > > 其实你可以不用自己删除.使用TTL设置短一些时间,试试
> > >
> > >
> > >
> > > athlon512@gmail.com
> > >
> > > 发件人: 戴嘉诚
> > > 发送时间: 2019-07-25 18:24
> > > 收件人: user-zh
> > > 主题: Re: Flink checkpoint 并发问题
> > > 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
> > >
> > > athlon512@gmail.com <athlon512@gmail.com> 于2019年7月25日周四 下午6:20写道:
> > >
> > > > setMaxConcurrentCheckpoints 这个参数你设置过么?
> > > >
> > > >
> > > >
> > > > athlon512@gmail.com
> > > >
> > > > 发件人: 戴嘉诚
> > > > 发送时间: 2019-07-25 18:07
> > > > 收件人: user-zh
> > > > 主题: Flink checkpoint 并发问题
> > > > 大家好:
> > > >
> > > >     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> > > >
> > > >
> > > >
> > >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> > > >
> > > >
> > > >
> > > >
> > >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> > > >
> > > >
> > > > java.lang.Exception: Could not perform checkpoint 550 for operator
> > > > KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> > > > (16/20).
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
> > > >
> > > >          at org.apache.flink.streaming.runtime.io
> > > > .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> > > >
> > > >          at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > > >
> > > >          at java.lang.Thread.run(Thread.java:748)
> > > >
> > > > Caused by: java.lang.Exception: Could not complete snapshot 550 for
> > > > operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> > > > 写入redis库存 (16/20).
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
> > > >
> > > >          ... 8 more
> > > >
> > > > Caused by: java.util.ConcurrentModificationException
> > > >
> > > >          at
> java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > > >
> > > >          at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > > >
> > > >          at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> > > >
> > > >          at
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > > >
> > > >          at
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > > >
> > > >          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> > > >
> > > >          ... 13 more
> > > >
> > >
> >
> >
>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message