flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guowei Ma <guowei....@gmail.com>
Subject Re: Timestamp Watermark Assigner bpund question
Date Mon, 15 Apr 2019 01:44:02 GMT
Hi, Vijay

>>>Then the Operator progresses to the next Watermark as a starting point
for events after event time reaches currWatermark ?
AFAIK, the operator that generates watermark is called by the frame work.
When the operator is called depends on the operator itself. For example the
operator that implements the AssignerWithPunctuatedWatermarks interface
would be called for every element.

>>>How does it guarantee that watermark never goes backwards ?
Whether the watermark, which is generated by the
AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks is send to
the downstream is controlled by the framework. If an operator returns a
watermark going back Flink would send it to the downstream.

Best,
Guowei


Vijay Balakrishnan <bvijaykr@gmail.com> 于2019年4月10日周三 下午11:44写道:

> Hi Guowei,
> Thx for your reply.
> I am trying to understand the logic behind the Point 1 i.e current
> Watermark being currMaxTimestamp minus the bound.
> So, does this mean the Operator processing a task has a current Event time
> < current Watermark < currMaxTimestamp ??? Then the Operator progresses to
> the next Watermark as a starting point for events after event time reaches
> currWatermark ?
> Also, I saw this comment in BoundedOutOfOrdernessTimestampExtractor.java.
>
> // this guarantees that the watermark never goes backwards.
> long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
>
>
> How does it guarantee that watermark never goes backwards ?
>
> TIA,
>
> Vijay
>
>
>
> On Tue, Apr 9, 2019 at 10:50 PM Guowei Ma <guowei.mgw@gmail.com> wrote:
>
>> Hi,
>> 1. From doc[1], A Watermark(t) declares that event time has reached time
>> t in that stream, meaning that there should be no more elements from the
>> stream with a timestamp t’ <= t (i.e. events with timestamps older or equal
>> to the watermark). So I think it might be counterintuitive that generating
>> a watermark, which is bigger than the timestamp of current element. At
>> least you should minus the bound.
>> 2. From the definition of watermark I think that watermark is not related
>> with the length of window. The bound is related to your application.
>> 3. In your case AssignerWithPunctuatedWatermarks might not be a good
>> choice. Watermark is not free, you might send too many watermarks. If your
>> source could generate some "watermark" element I think you could use the
>> interface. You could choose AssignerWithPeriodicWatermarks. You can find
>> the example from doc[2].
>>
>> 1.
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks
>> 2.
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators
>> Best,
>> Guowei
>>
>>
>> Vijay Balakrishnan <bvijaykr@gmail.com> 于2019年4月10日周三 上午7:41写道:
>>
>>> Hi,
>>> I have created a TimestampAssigner as follows.
>>> I want to use monitoring.getEventTimestamp() with an Event Time
>>> processing and collected aggregated stats over time window intervals of 5
>>> secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner
>>> with a bound ? I want to collect the stats for each eventTimestamp + window
>>> intervals. My understanding - *the generated watermark which is
>>> eventTimestamp + bound will collect all the eventTimestamp's which arrive
>>> within that Watermark inside each eventTimestamp + 5s etc window interval.
>>> Or does this bound have to be based on the windowInterval i.e
>>> extractedTimestamp + windowInterval + bound *??
>>>
>>>
>>>> *public class MonitoringTSWAssigner implements
>>>> AssignerWithPunctuatedWatermarks<Monitoring> {*
>>>> * private long bound = 5 * (long) 1000; *
>>>> * public long extractTimestamp(Monitoring monitoring, long previousTS)
>>>> {*
>>>> *        return monitoring.getEventTimestamp();**    }*
>>>>
>>>> *    public Watermark checkAndGetNextWatermark(Monitoring monitoring,
>>>> long extractedTimestamp) {*
>>>> *        return new Watermark(extractedTimestamp + bound);//<====
>>>> should it be - bound ?*
>>>> *    }**}*
>>>
>>>
>>> Used here:
>>>
>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>> final DataStreamSource<Monitoring> monitoringDataStreamSource =
>>>> env.addSource(....);
>>>> DataStream<Monitoring> kinesisStream =
>>>> monitoringDataStreamSource.assignTimestampsAndWatermarks(new
>>>> MonitoringTSWAssigner());
>>>> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream =
>>>> kinesisStream.keyBy("deployment", .....);
>>>> final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream =
>>>>
>>>> monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window
>>>
>>>
>>> TIA,
>>>
>>

Mime
View raw message