flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Flink CEP with event time
Date Thu, 04 Jan 2018 10:42:21 GMT
Yes, because event-time only advances if something makes it advance. Basically.

> On 4. Jan 2018, at 11:34, shashank agarwal <shashank734@gmail.com> wrote:
> 
> But this will be wrong in my case. So I have to wait for the results until I receive
next event.
> 
> 
> 
> ‌
> 
> On Thu, Jan 4, 2018 at 3:53 PM, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
> Think this is actually working as intended, from your earlier description of when results
are produced: When you see Event 1.B, the watermark is not sufficiently advanced to trigger
computation, only when you see Event 2.A does the watermark advance and you get a result.
This is what I would expect to happen.
> 
> 
>> On 3. Jan 2018, at 19:46, shashank agarwal <shashank734@gmail.com <mailto:shashank734@gmail.com>>
wrote:
>> 
>> @Dawid, I was using 1.3.2, I have checked on 1.4.0 also still facing the same issue.
>> 
>> 
>> @Aljoscha, I have to cover the case where B can come after A from Kafka. How I can
achieve this as Event Time is not working. How should I implement this?
>> 
>>  A followedBy B.
>> 
>> As I am using kafka source and my event API's using load balancers so sometimes B
comes before A. So my CEP doesn't generate any result for those events. 
>> 
>> I am trying to use Event time like this. Am I am doing anything wrong?
>> 
>> 
>>  kafkaSource.assignTimestampsAndWatermarks(
>>         new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
>>           override def extractTimestamp(event: Event): Long = {
>>             try {
>>               val originTime = event.origTimestamp.getOrElse("0").toLong
>>               if(originTime <= 0)
>>                 {
>>                   val serverTime = event.serverTimestamp.getOrElse("0").toLong
>>                   if(serverTime <= 0)
>>                     {
>>                       System.currentTimeMillis()
>>                     }
>>                   else
>>                     {
>>                       serverTime
>>                     }
>>                 }
>>               else {
>>                 originTime
>>               }
>>             }
>>             catch {
>>               case e: Exception => Log.error("OriginTimestamp Exception occured,
"error", e.printStackTrace);
>>                 System.currentTimeMillis()
>>             }
>>           }
>>         }
>>       )
>> ‌
>> 
>> On Wed, Jan 3, 2018 at 9:42 PM, Dawid Wysakowicz <wysakowicz.dawid@gmail.com <mailto:wysakowicz.dawid@gmail.com>>
wrote:
>> Hi shashank,
>> 
>> What version of flink are you using? Is it possible that you are hitting this issue:
https://issues.apache.org/jira/browse/FLINK-7563 <https://issues.apache.org/jira/browse/FLINK-7563>
?
>> 
>> Watermark semantics in CEP was buggy and events were processed only if its timestamp
was lower than current watermark while it should be lower or equal.
>> 
>> Best
>> Dawid
>> 
>> > On 3 Jan 2018, at 17:05, shashank agarwal <shashank734@gmail.com <mailto:shashank734@gmail.com>>
wrote:
>> >
>> > ssed A with origTimestamp Y. (
>> 
>> 
>> 
>> 
>> -- 
>> Thanks Regards
>> 
>> SHASHANK AGARWAL
>>  ---  Trying to mobilize the things....
>> 
> 
> 
> 
> 
> -- 
> Thanks Regards
> 
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....


Mime
View raw message