flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: readFile, DataStream
Date Mon, 13 Nov 2017 09:47:31 GMT
Hi Juan,

The problem is that once a file for a certain timestamp is processed and the global modification
timestamp is modified, 
then all files for that timestamp are considered processed.

The solution is not to remove the = from the modificationTime <= globalModificationTime;
in ContinuousFileMonitoringFunction, as this 
would lead to duplicates. 
The solution, in my opinion is to keep a list of the filenames (or hashes) of the files processed
for the last globalModTimestamp (and only for that timestamp) and when there are new with
the same timestamp, then check if the name of the file they belong is in that list. 

This way you pay a bit of memory but you get what you want.

What do you think?


> On Nov 10, 2017, at 12:54 PM, Juan Miguel Cejuela <juanmi@tagtog.net> wrote:
> Hi there,
> I’m trying to watch a directory for new incoming files (with StreamExecutionEnvironment#readFile)
with a subsecond latency (interval watch of ~100ms, and using the flag FileProcessingMode.PROCESS_CONTINUOUSLY).
> If many files come in within (under) the interval watching time, flink doesn’t seem
to get notice of the files, and as a result, the files do not get processed. The behavior
also seems undeterministic, as it likely depends on timeouts and so on. For example, 10 new
files come in immediately (that is, essentially in parallel) and perhaps 3 files get processed,
but the rest 7 don’t.
> I’ve extended and created my own FileInputFormat, for which I don’t do much more
than in the open function, log when a new file comes in. That’s how I know that many fails
get lost.
> On the other hand, when I restart flink, all the files in the directory are immediately
processed. This is the expected behavior and works fine.
> The situation of unprocessed files is a bummer.
> Am I doing something wrong? Do I need to set something in the configuration? Is it a
bug in Flink?
> Hopefully I described my problem clearly.
> Thank you.

View raw message