flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: Watermark Question on Failed Process
Date Tue, 03 Apr 2018 11:58:14 GMT
Hi Chengzhi,

if you emit a watermark even though there is still data with a lower 
timestamp, you generate "late data" that either needs to be processed in 
a separate branch of your pipeline (see sideOutputLateData() [1]) or 
should force your existing operators to update their previously emitted 
results. The latter means holding state or the contents of your windows 
longer (see allowedLateness() [1]). I think in general a processing time 
watermark strategy might not be suitable for reprocessing. Either you 
parameterize your watermark generator such that you can pass information 
through job parameters or you use another strategy such as 
BoundedOutOfOrdernessTimestampExtractor [2] and sinks that allow 
idempotent updates.

I hope this helps.



Am 02.04.18 um 23:51 schrieb Chengzhi Zhao:
> Hello, flink community,
> I am using period watermark and extract the event time from each 
> records from files in S3. I am using the `TimeLagWatermarkGenerator` 
> as it was mentioned in flink documentation.
> Currently, a new watermark will be generated using processing time by 
> fixed amount
> override def getCurrentWatermark: Watermark = {
>     new Watermark(System.currentTimeMillis() - maxTimeLag)
> }
> This would work fine as long as process is running. However, in case 
> of failures, I mean if there was some bad data or out of memory 
> occurs, I need to stop the process and it will take me time to get 
> back. If the maxTimeLag=3 hours, and it took me 12 hours to realize 
> and fix it.
> My question is since I am using processing time as part of the 
> watermark, when flink resumed from failures, will some records might 
> be ignored by the watermark? And what's the best practice to catchup 
> and continue without losing data? Thanks!
> Best,
> Chengzhi

View raw message