spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gil Vernik (JIRA)" <>
Subject [jira] [Commented] (SPARK-25155) Streaming from storage doesn't work when no directories exists
Date Mon, 20 Aug 2018 03:51:00 GMT


Gil Vernik commented on SPARK-25155:

[] what is your input on the issue i reported? You think it's a bug? what
do you think on the proposed change? Thanks

> Streaming from storage doesn't work when no directories exists
> --------------------------------------------------------------
>                 Key: SPARK-25155
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams, Structured Streaming
>    Affects Versions: 2.3.1
>            Reporter: Gil Vernik
>            Priority: Minor
> I have an issue related `org.apache.spark.streaming.dstream.FileInputDStream` method
> Streaming for the giving path suppose to pickup new files only ( based on the previous
run timestamp ). However the code in Spark will first obtain directories, then for each
directory will find new files. Here is the relevant code:
> *val* directoryFilter = *new* PathFilter
> {   *override def* accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory
> *val* directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
> *val* newFiles = directories.flatMap(dir =>
>   fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
> This is not optimized, as it always requires two accesses.  In addition this seems to
be  buggy
> I have an S3 bucket “mydata” with  objects “a.csv”, “b.csv”. I noticed
that   fs.globStatus(“[s3a://mydata/], directoryFilter).map(_.getPath) returned 0 directories
and so “a.csv”, “b.csv” were not picked by Spark.
> I tried to make path as “[s3a://mydata/*]” and it didn't worked also.
> I experienced the same problematic behavior with the file system when tried to stream
from “/Users/streaming/*”
>  I suggest to change the code in Spark so it will perform first list without directoryFilter,
which seems not needed at all. The code could  be
> *val* directoriesOrfiles = fs.globStatus(directoryPath).map(_.getPath)
> The flow would be ( for each entry in  directoriesOrfiles )
>  * If data object: Spark will apply newFileFilter on the returned objects
>  * If directory: then the existing  code will perform additional listing at the directory
> This way it will pick up files from the root of path and the content of directories

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message