flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: [DISCUSS] Handling event-time in continuous file processing.
Date Fri, 09 Dec 2016 08:09:28 GMT
Hi Kostas,

I think it would be good to open two JIRAs to track these issues:

1) to document the shortcomings of the current solution
2) propose a solution based on your idea of group-ids.

Would you like to do that?


2016-12-01 10:48 GMT+01:00 Fabian Hueske <fhueske@gmail.com>:

> Hi Kostas,
> Thanks for bringing up this issue and the good explanation!
> I think we need to do two things:
> 1) Clearly explain the limitations of the current version in the online
> documentation and JavaDocs.
> This should point out that the source does only work correctly with
> event-time and timestamp/watermark assigners if the timestamps of records,
> which are read from files in mod-time order, are monotonically increasing.
> 2) Implement a solution that handles event-time for files from different
> sources correctly.
> I like your proposal to defined file-groups based on the file name.
> I believe in some sense this is similar to the problem of reading from
> different Kafka partitions and assigning watermarks there.
> There are different ways to implement this. Either we can assign complete
> file-groups to readers and keep the current watermark there (similar as the
> Kafka consumer works I assume).
> Or we allow to completely distribute splits to arbitrary readers and have
> a bit more effort in tracking the watermarks for each file-group.
> In either case we might need to know the set of file-groups in advance or
> a good way to deal with files that appear and start a new file-group.
> This is not trivial and we need a good design for this.
> Cheers,
> Fabian
> 2016-12-01 10:05 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com>:
>> Hi all,
>> This is to open a discussion on how to better handle event-time in
>> continuous file processing.
>> For the sake of illustration of the problem we will use the example of
>> processing hourly server logs.
>> In this case, each server writes its logs in hourly files, with names:
>> server-id-timestamp
>> Assumptions:
>>         1) we have two servers producing logs server-1 and server-2
>>         2) they have produced one file each, e.g. for 10am to 11am, so
>> server-1-10, server-2-10
>>         3) our job has a parallelism of 2, so the
>> ContinuousFileMonitoringFunction has parallelism 1 and the reader 2
>>         4) records within each file have timestamps in order, or
>> moderately out-of-order
>>         5) each log file is split into two splits by the underlying
>> filesystem, e.g. server-1-10-1 and server-1-10-2
>> In the scenario above, and in the current implementation of the
>> continuous file processing, the monitoring function will:
>>         1) sort the files on ascending modification time,
>>         2) compute the splits of each of the files and
>>         3) forward the splits in order of the modification timestamp and
>> their offset in the file to the downstream readers randomly.
>> Given the above, reader-1 will take server-1-10-1, and reader-2,
>> server-1-10-2.
>> Focusing on reader-1, as soon as it gets its split, it will start reading
>> the contained elements and assign timestamps to them
>> based on a user-specified timestamp extractor (this may happen later in
>> the pipeline bit it does not break the generality of the problem).
>> In addition, given that we are operating in event time, the reader will
>> also start emitting watermarks based on the timestamps
>> assigned to the elements it has read.
>> In this case, after processing server-1-10-1 and server-1-10-2 by the 2
>> readers, the watermark will have advanced somewhere in the
>> middle of the timestamps included in the file (files have logs for 10 to
>> 11 am).
>> In this case, when the splits of file server-2-10 are to be processed,
>> elements in the beginning of the file will be dropped as late.
>> Proposed Solution:
>> To face this, we could do the following:
>> 1) Split the files (and their corresponding splits) in file-groups e.g.
>> based on a user-specified parser of the filename.
>> 2) Files/splits within the same file-group should be ordered so that
>> server-1-10 is processed
>> before server-1-11. This can be done through the same filename parser
>> mentioned before.
>> 3) In each reader task, keep a watermark emitter/ timestamp extractor and
>> a (candidate) watermark per file-group. The watermark
>> emitted by each task should be the minimum across all its file-groups.

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message