flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Small-files source - partitioning based on prefix of file
Date Tue, 28 Aug 2018 09:03:24 GMT
Hi Averell,

Barriers are injected into the regular data flow by source functions.
In case of a file monitoring source, the barriers are injected into the
stream of file splits that are passed to the
ContinuousFileMonitoringFunction.
The CFMF puts the splits into a queue and processes them with a dedicated
split reader thread. All state modifying operations of the thread (emitting
a record, opening a new split, etc.) are guarded by a checkpoint lock.
When the CFMF receives a barrier, the checkpointing logic requests the lock
and forces the split reader thread to pause. Then it requests the current
state of the thread and writes it into its checkpoint.

In order to be able to properly checkpoint the state of the reading thread
within a split, the InputFormat that is used to read the files must
implement the CheckpointableInputFormat interface. Otherwise, a split will
be read from the start.

Best, Fabian

Am Mo., 27. Aug. 2018 um 10:55 Uhr schrieb Averell <lvhuyen@gmail.com>:

> Hello Fabian, and all,
>
> Please excuse me for digging this old thread up.
> I have a question regarding sending of the "barrier" messages in Flink's
> checkpointing mechanism: I want to know when those barrier messages are
> sent
> when I am using a file source. Where can I find it in the source code?
>
> I'm still with my 20,000 small files issue, when I have all those 20000
> files appear to the ContinuousFileMonitorfingFunction at the same time.
> It is taking only a few seconds to list all those files, but it is expected
> to take about 5 minutes have those 20K files processed till my sink.
> Due to some resources limitation issue, my job fails after about 3 minutes.
> And what is happening after that is the job crashes, gets restored, tries
> to
> process all 20K files from file 1 again, and ultimately fails again after 3
> minutes,... It goes into an indefinite loop.
>
> I think that this is the expected behaviour, as my current checkpoint
> config
> is to checkpoint every 10s, and it took only a second or two for the
> listing
> of those 20K files. Am I correct here? And do we have a solution for this?
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Mime
View raw message