flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hequn Cheng <chenghe...@gmail.com>
Subject Re: Why data didn't enter the time window in EventTime mode
Date Thu, 19 Jul 2018 13:48:22 GMT
Hi Soheil,
You can monitor the watermarks in the web dashboard as Fabian said. There
are some documents here[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time

On Thu, Jul 19, 2018 at 3:53 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi Soheil,
>
> Hequn is right. This might be an issue with advancing event-time.
> You can monitor that by checking the watermarks in the web dashboard or
> print-debug it with a ProcessFunction which can lookup the current
> watermark.
>
> Best, Fabian
>
> 2018-07-19 3:30 GMT+02:00 Hequn Cheng <chenghequn@gmail.com>:
>
>> Hi Soheil,
>>
>> > wait 8 milliseconds (according to my code) to see if any other data
>> with the same key received or not and after 8 millisecond it will be
>> triggered.
>> Yes, but the time is event time, so if there is no data from source the
>> time won't be advanced.
>>
>> There are some reasons why the event time has not been advanced:
>> 1. There are no data from the source
>> 2. One of the source parallelisms doesn't have data
>> 3. The time field, i.e, Long in Tuple3, should be millisecond instead of
>> second.
>> 4. Data should cover a longer time spam than the window size to advance
>> the event time.
>>
>> Best, Hequn
>>
>> On Wed, Jul 18, 2018 at 3:53 PM, Soheil Pourbafrani <
>> soheil.ir08@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> In a datastream processing problem, the source generated data every 8
>>> millisecond and timestamp is a field of the data. In default Flink time
>>> behavior data enter the time window but when I set Flink time to EventTime
>>> it will output nothing! Here is the code:
>>>
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>> SingleOutputStreamOperator<Tuple3<String,Long, JSONObject>> res =
aggregatedTuple
>>>                 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String,
Long, JSONObject>>(Time.milliseconds(8)) {
>>>
>>>             @Override
>>>             public long extractTimestamp(Tuple3<String, Long, JSONObject>
element) {
>>>                 return element.f1 ;
>>>             }
>>>         }).keyBy(1).timeWindow(Time.milliseconds(8))
>>>                 .allowedLateness(Time.milliseconds(3))
>>>                 .sideOutputLateData(lateOutputTag)
>>>                 .reduce(processing...);
>>>         DataStream<Tuple3<String, Long, JSONObject>> lateData = res.getSideOutput(lateOutputTag);
>>>         res.print();
>>>
>>> What is the problem with my code?
>>> According to the Flink documents, my understanding about EventTime is
>>> that for example in case of time window when a new data received it start a
>>> new (logical window) based on new data event timestamp and wait 8
>>> milliseconds (according to my code) to see if any other data with the same
>>> key received or not and after 8 millisecond (from timestamp of the first
>>> element of the window) it will be triggered. Since data source generated
>>> data in a constant periodic interval, I set a watermarck of  8, too. Is my
>>> understanding about Flink window in EventTime correct?
>>>
>>
>>
>

Mime
View raw message