flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eron Wright <eronwri...@gmail.com>
Subject Re: Handle event time
Date Mon, 11 Sep 2017 17:48:47 GMT
As mentioned earlier, the watermark is the basis for reasoning about the
overall progression of time.   Many operators use the watermark to
correctly organize records, e.g. into the correct time-based window.
Within that window the records may still be unordered.   That said, some
operators do take pains to reorder the records, notably the Flink CEP
operator to correctly detect temporal patterns.  Basically, the operator
buffers records until a watermark arrives; all buffered records older than
the watermark may then be sorted and processed.

It is tempting to write a standalone operator that simply reorders records
as described, but subsequent repartitioning to downstream operators would
reintroduce disorder.  Therefore one must ensure that subsequent processing
is done with a 'forward' partitioning strategy.

Hope this helps!
Eron

On Fri, Sep 8, 2017 at 3:50 AM, AndreaKinn <kinn6aer@hotmail.it> wrote:

> Thank you, effectively I developed also a simple custom solution for
> watermark looking at flink doc but anyway I see unordered printed streams.
> I have a doubt about flink behaviour: if I understand, flink doesn't
> perform
> automatically reordering of records in a stream, so if for instance a
> record
> arrives in late what is the behaviour of flink? In the doc it's described
> that elements arrive after in late are dropped (allowed lateness default
> value is 0) but also using this watermark emitter:
>
> *public class CustomTimestampExtractor implements
> AssignerWithPeriodicWatermarks<Tuple6&lt;String, String, Date, String,
> String, Double>>{
>
>         private static final long serialVersionUID = 5448621759931440489L;
>         private final long maxOutOfOrderness = 0;
>     private long currentMaxTimestamp;
>
>         @Override
>         public long extractTimestamp(Tuple6<String, String, Date, String,
> String,
> Double> element, long previousElementTimestamp) {
>                 long timestamp = element.f2.getTime();
>                 currentMaxTimestamp = Math.max(timestamp,
> currentMaxTimestamp);
>                 return timestamp;
>         }
>
>         @Override
>         public Watermark getCurrentWatermark() {
>                 return new Watermark(currentMaxTimestamp -
> maxOutOfOrderness);
>         }
> }*
>
> with maxOutOfOrderness = 0 I see unordered record in the stream.
>
> What I want to obtain is a fully ordered stream, is there a way to
> implement
> it?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Mime
View raw message