flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yun Tang <myas...@live.com>
Subject Re: PathIsNotEmptyDirectoryException 异常
Date Tue, 27 Aug 2019 09:11:16 GMT
在Flink看来,删除操作都是同一个线程内顺序执行的,S3AFileSystem.delete
接口看上去也不是异步执行的。出异常的时候,能检查一下chk-50832 目录里面还有什么么?


________________________________
From: Andrew Lin <chendonglin521@gmail.com>
Sent: Tuesday, August 27, 2019 16:53
To: wangpei@cmcm.com <wangpei@cmcm.com>; user-zh@flink.apache.org <user-zh@flink.apache.org>
Subject: Fwd: PathIsNotEmptyDirectoryException 异常

https://issues.apache.org/jira/browse/FLINK-13856 <https://issues.apache.org/jira/browse/FLINK-13856>

可能是 s3a的问题,删除操作是不是阻塞的?
报错看起来是因删除文件夹时候前面的删除操作还没完成。文件不是空的


> 下面是被转发的邮件:
>
> 发件人: 王佩 <wangpei@cmcm.com>
> 主题: PathIsNotEmptyDirectoryException 异常
> 日期: 2019年8月24日 GMT+8 下午4:49:36
> 收件人: user-zh <user-zh@flink.apache.org>
> 回复-收件人: user-zh@flink.apache.org
>
> Flink 版本 1.8.0
> 采用RocksDBStateBackend
>
> 程序上线后,运行过程中,经常会有如下异常:
> 2019-08-24 00:34:05,192 WARN
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
> Fail to subsume the old checkpoint.
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
> `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-50832': Directory
> is not empty
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
> at
> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147)
> at
> org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70)
> at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264)
> at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
> at
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2019-08-24 02:58:03,572 WARN
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
> Fail to subsume the old checkpoint.
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
> `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-51120': Directory
> is not empty
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
> at
> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147)
> at
> org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70)
> at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264)
> at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
> at
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2019-08-24 03:21:00,784 WARN
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
> Fail to subsume the old checkpoint.
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
> `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-51166': Directory
> is not empty
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
> at
> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147)
> at
> org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70)
> at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264)
> at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
> at
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> 请教下,异常的原因,可能是啥。

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message