flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Explanation of the output of timeWindowAll(Time.milliseconds(3))
Date Tue, 29 Dec 2015 00:05:40 GMT
Hi Nirmalya,

event time events (such as an event time trigger to compute a window) are
triggered when a watermark is received that is larger than the triggers
timestamp. By default, watermarks are emitted with a fixed time interval,
i.e., every x milliseconds. When a new watermark is emitted, Flink asks for
the currently valid watermark value. If a window operator receives a
watermark that closes multiple windows, the order in which the windows are
computed is random.

In your case, you are reading data from a file which is very fast and
several windows are completed when the first watermark is received. The
order in which these windows are computed an their events returned is
random.

You can configure the watermark interval
with ExecutionConfig.setAutoWatermarkInterval(long milliseconds)).
Alternatively, you can implement a source function that emits watermarks by
itself.

Best,
Fabian



2015-12-26 18:01 GMT+01:00 Nirmalya Sengupta <sengupta.nirmalya@gmail.com>:

> Hello Fabian <fhueske@gmail.com>
>
> Merry Christmas to you and everyone else in this forum.
>
> Another neophyte's question, patience please.
>
> I have following code:
>
>     val env = StreamExecutionEnvironment.createLocalEnvironment(1)
>
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>     val readings =
>       readIncomingReadings(env,"./sampleIOT.csv")
>       .map(e => (e.timeStamp,e.ambientTemperature))
>       .assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
>       .timeWindowAll(Time.milliseconds(3))
>       .maxBy(1)
>
>
> In the datafile, timestamps are 2nd from the right field (first few
> records only):
>
> probe-42a9ddca,193,819.12,74.3712,1448028161,22.07
> probe-252a5bbd,197,710.32,80.6072,1448028161,14.64
> probe-987f2cb6,200,750.4,76.0533,1448028161,14.72
> probe-24444323,197,816.06,84.0816,1448028161,4.405
> probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43
> probe-20c609fb,204,804.37,84.5243,1448028161,22.87
> probe-c027fdc9,195,858.61,81.7682,1448028161,24.47
> probe-2c6cd3de,198,826.96,85.26,1448028162,18.99
> probe-960906ca,197,797.63,77.4359,1448028162,27.62
> probe-16226f9e,199,835.5,81.2027,1448028162,18.82
> probe-4de4e64b,200,851.04,80.5296,1448028162,27.43
> .......
>
>
> The output is:
>
> (1448028163,27.83)
> (1448028166,32.06)
> (1448028160,30.02)
>
> The contents are correct, but I am not sure about the *order in which
> they appear*. Because I am using
>
> val env = StreamExecutionEnvironment.createLocalEnvironment(1)  // only
> one thread anyway
>
>
> and the timestamps are guaranteed to be in the *ascending order* (I have
> sorted the CSV before using it), my expectation is that the Flink should
> print the output as:
>
> (1448028160,30.02)
>
> (1448028163,27.83)
>
> (1448028166,32.06)
>
> How do I explain the randomness?
>
> -- Nirmalya
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is
> where they should be.
> Now put the foundation under them."
>

Mime
View raw message