flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Averell <lvhu...@gmail.com>
Subject Strange behaviour with checkpointing and custom FilePathFilter
Date Thu, 20 Sep 2018 22:51:46 GMT
Good day everyone,

I have about 100 thousand files to read, and a custom FilePathFilter with a
simple filterPath method defined as below (the custom part is only to check
file-size and skip files with size = 0) 
	override def filterPath(filePath: Path): Boolean = {
		filePath == null ||
				filePath.getName.startsWith(".") ||
				filePath.getName.startsWith("_") ||
				filePath.getName.contains(FilePathFilter.HADOOP_COPYING) ||
				{
						def fileStatus = filePath.getFileSystem.getFileStatus(filePath)
						!fileStatus.isDir && fileStatus.getLen == 0
				}
	}

It is running fine either when I disable checkpointing or when I use the
default FilePathFilter. It takes about 7 minutes to finished processing all
files (from source to sink).
However, when I have both, customer filter and checkpointing, it usually
takes 15-20 minutes for Flink to start reading my files (in Flink GUI, the
CustomFileSource monitor generates 0 records during that 15-20 minutes
period)

Could someone please help with this? 

Thank you very much for your time.
Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Mime
View raw message