flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 戴嘉诚 <a773807...@gmail.com>
Subject Flink checkpoint 并发问题
Date Thu, 25 Jul 2019 10:07:40 GMT
大家好:

    我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的

在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。


这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?


java.lang.Exception: Could not perform checkpoint 550 for operator
KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
(16/20).

         at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)

         at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)

         at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)

         at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)

         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)

         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)

         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

         at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.Exception: Could not complete snapshot 550 for
operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
写入redis库存 (16/20).

         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)

         at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)

         at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)

         at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)

         at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)

         at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)

         ... 8 more

Caused by: java.util.ConcurrentModificationException

         at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)

         at java.util.HashMap$EntryIterator.next(HashMap.java:1476)

         at java.util.HashMap$EntryIterator.next(HashMap.java:1474)

         at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)

         at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)

         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)

         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)

         at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)

         at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)

         at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)

         at org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)

         at org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)

         at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)

         at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)

         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)

         ... 13 more

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message