flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Till Rohrmann (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-9587) ContinuousFileMonitoringFunction crashes on short living files
Date Mon, 06 Aug 2018 08:36:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-9587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

Till Rohrmann updated FLINK-9587:
    Fix Version/s:     (was: 1.6.0)

> ContinuousFileMonitoringFunction crashes on short living files
> --------------------------------------------------------------
>                 Key: FLINK-9587
>                 URL: https://issues.apache.org/jira/browse/FLINK-9587
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystem, Streaming, Streaming Connectors
>    Affects Versions: 1.5.0
>         Environment: Flink 1.5 running as a standalone cluster.
>            Reporter: Andrei Shumanski
>            Priority: Critical
>             Fix For: 1.6.1, 1.7.0
> Hi,
> We use Flink to monitor a directory for new files. The filesystem is a MapR Fuse mount
that looks like a local FS.
> The files are copied to the directory by another process that uses rsync command. While
a file is not completely written rsync creates a temporary file with a name like ".file.txt.uM6MfZ"
where the last extension is a random string.
> When the copying is done - file is renamed to the final name "file.txt".
> The bug is that Flink does not correctly handle this behavior and does not take into
account that files in the directory might be deleted.
> We are getting error traces:
> {code:java}
> java.io.FileNotFoundException: File file:/mapr/landingarea/cId=2075/.file_00231.cpio.gz.uM6MfZ
does not exist or the user running Flink ('root') has insufficient permissions to access it.
> at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
> at org.apache.flink.core.fs.local.LocalFileSystem.listStatus(LocalFileSystem.java:177)
> at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:92)
> at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:707)
> at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
> at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
> at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
> at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:591)
> at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonitoringFunction.java:270)
> at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:242)
> at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:206)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> In LocalFileSystem.listStatus(final Path f) we read the list of files in a directory
and then create LocalFileStatus object for each of the files. But a file might be removed
during the interval between these operations.
> I do not see any option to handle this exception in our code.

This message was sent by Atlassian JIRA

View raw message