flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Congxian Qiu <qcx978132...@gmail.com>
Subject Re: Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator
Date Fri, 03 Apr 2020 10:09:19 GMT
HDFS 上的路径和本地不一样,如果你要看 HDFS 路径的话,可能需要看 Checkjpoint
Meta 的相关信息,这个比较麻烦,可以参考
CheckpointMetadataLoadingTest 的相关测试。
我再看了一下你给的 TM Log,看上去是 148 行的 outputStream.close() 出错了(有个比较奇怪的现象是,这里的
outputStream 是本地的文件,但是从错误栈看是 HadoopFileSystem)。你这个是稳定复现的问题吗?如果是的话,能否贴一下打开
debug log,贴一下 JM/TM log,另外能给一个可复现的 作业更好

Best,
Congxian


chenxyz <chenxyz@163.com> 于2020年4月1日周三 下午5:18写道:

> Hi, 从贤,
> 我查看了下HDFS,
> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160下面是空的,也没有db这一层目录。
>
>
>
>
>
>
>
>
> 在 2020-04-01 16:50:13,"Congxian Qiu" <qcx978132955@gmail.com> 写道:
> >Hi
> >Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复
> >从 TM 日志看像下载出错了,你可以看下
>
> >/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
> >这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因
> >
> >Best,
> >Congxian
> >
> >
> >chenxyz <chenxyz@163.com> 于2020年4月1日周三 下午3:02写道:
> >
> >> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could
not restore keyed state backend for
> >> KeyedProcessOperator。这个问题怎么解决呢?
> >>
> >> 版本:1.10 standalone
> >>
> >> 配置信息:
> >>
> >> state.backend: rocksdb
> >>
> >> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint
> >>
> >> state.backend.incremental: true
> >>
> >> jobmanager.execution.failover-strategy: region
> >>
> >> io.tmp.dirs: /data/flink1_10/tmp
> >>
> >>
> >>
> >>
> >> 任务的checkpoint配置:
> >>
> >> env.enableCheckpointing(2 * 60 * 1000);
> >>
> >>
> >>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >>
> >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000);
> >>
> >> env.getCheckpointConfig().setCheckpointTimeout(60000);
> >>
> >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> >>
> >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12);
> >>
> >>
> >>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >>
> >>
> >>
> >>
> >> 日志信息:
> >>
> >>
> >>
> >>
> >> 2020-04-01 11:13:03
> >>
> >> java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> >>
> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >>
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >>
> >> at java.lang.Thread.run(Thread.java:748)
> >>
> >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> >> state backend for
> >> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of
> the
> >> 1 provided restore options.
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> >>
> >> ... 9 more
> >>
> >> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
> Caught
> >> unexpected exception.
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> >>
> >> ... 11 more
> >>
> >> Caused by: java.nio.file.NoSuchFileException:
> >>
> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
> >> ->
> >>
> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst
> >>
> >> at
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> >>
> >> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> >>
> >> at
> >>
> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
> >>
> >> at java.nio.file.Files.createLink(Files.java:1086)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
> >>
> >> ... 15 more
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> TaskManager的报错信息:
> >>
> >>
> >>
> >>
> >> 2020-04-01 14:48:10,726 ERROR
> >>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder  -
> >> Caught unexpected exception.
> >>
> >> java.io.InterruptedIOException: Interrupted while waiting for data to
> be
> >> acknowledged by pipeline
> >>
> >> at
> >>
> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147)
> >>
> >> at
> >>
> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128)
> >>
> >> at
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229)
> >>
> >> at
> >>
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> >>
> >> at
> >>
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
> >>
> >> at
> >>
> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
> >>
> >> at
> >>
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
> >>
> >> at
> >>
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
> >>
> >> at
> >>
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
> >>
> >> at
> >>
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
> >>
> >> at
> >>
> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
> >>
> >> at
> >>
> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> >>
> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >>
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >>
> >> at java.lang.Thread.run(Thread.java:748)
> >>
> >> 2020-04-01 14:48:10,726 WARN
> >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure  -
> >> Exception while restoring keyed state backend for
> >> KeyedProcessOperator_0cead2d40df6e304d3168f6366c79e3f_(1/2) from
> >> alternative (1/1), will retry while more alternatives are available.
> >>
> >> org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected
> >> exception.
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> >>
> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >>
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >>
> >> at java.lang.Thread.run(Thread.java:748)
> >>
> >> Caused by: java.io.InterruptedIOException: Interrupted while waiting
> for
> >> data to be acknowledged by pipeline
> >>
> >> at
> >>
> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147)
> >>
> >> at
> >>
> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128)
> >>
> >> at
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229)
> >>
> >> at
> >>
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> >>
> >> at
> >>
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
> >>
> >> at
> >>
> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
> >>
> >> at
> >>
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
> >>
> >> at
> >>
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
> >>
> >> at
> >>
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
> >>
> >> at
> >>
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
> >>
> >> at
> >>
> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
> >>
> >> at
> >>
> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
> >>
> >> ... 15 more
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message