flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "eric" <erickxi...@qq.com>
Subject [State Backend] 请教个问题,checkpoint恢复失败
Date Tue, 30 Apr 2019 08:30:22 GMT
大家好:


刚接触flink, 跑了个测试state checkpoint的程序:
1) 数据源是socket模式,用的是keyed state backend; 提交job跑一会
2) 然后关闭数据源的socket,这时job会进入failed状态
3) 停几秒,把数据源socket重新打开
4) 这时flink会重连socket, 对job进行恢复,恢复时出错了, 存储的MapState没有成功恢复


环境:
    flink: 1.8.0
    flink的hadoop包:flink-shaded-hadoop2-uber-2.6.5-1.8.0.jar
    hdfs文件系统: hadoop2.6.0-cdh5.16.1
    运行在standalone模式, state backend选fssystem或rocksdb都没成功



出错的log:


Caused by: java.io.IOException: Stream closed
        at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:892)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:963)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:757)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84)
        at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51)
        at org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41)
        at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
        at org.apache.flink.types.StringValue.readString(StringValue.java:769)
        at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
        at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:148)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
        at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:74)
        at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
        at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:290)
        at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:251)
        at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:127)
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message