flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shashank agarwal <shashank...@gmail.com>
Subject Re: Flink CEP with event time
Date Thu, 04 Jan 2018 10:34:55 GMT
But this will be wrong in my case. So I have to wait for the results until
I receive next event.



‌

On Thu, Jan 4, 2018 at 3:53 PM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Think this is actually working as intended, from your earlier description
> of when results are produced: When you see Event 1.B, the watermark is not
> sufficiently advanced to trigger computation, only when you see Event 2.A
> does the watermark advance and you get a result. This is what I would
> expect to happen.
>
>
> On 3. Jan 2018, at 19:46, shashank agarwal <shashank734@gmail.com> wrote:
>
> @Dawid, I was using 1.3.2, I have checked on 1.4.0 also still facing the
> same issue.
>
>
> @Aljoscha, I have to cover the case where B can come after A from Kafka.
> How I can achieve this as Event Time is not working. How should I implement
> this?
>
>  A followedBy B.
>
> As I am using kafka source and my event API's using load balancers so
> sometimes B comes before A. So my CEP doesn't generate any result for those
> events.
>
> I am trying to use Event time like this. Am I am doing anything wrong?
>
>
>  kafkaSource.assignTimestampsAndWatermarks(
>         new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10))
> {
>           override def extractTimestamp(event: Event): Long = {
>             try {
>               val originTime = event.origTimestamp.getOrElse("0").toLong
>               if(originTime <= 0)
>                 {
>                   val serverTime = event.serverTimestamp.
> getOrElse("0").toLong
>                   if(serverTime <= 0)
>                     {
>                       System.currentTimeMillis()
>                     }
>                   else
>                     {
>                       serverTime
>                     }
>                 }
>               else {
>                 originTime
>               }
>             }
>             catch {
>               case e: Exception => Log.error("OriginTimestamp Exception
> occured, "error", e.printStackTrace);
>                 System.currentTimeMillis()
>             }
>           }
>         }
>       )
> ‌
>
> On Wed, Jan 3, 2018 at 9:42 PM, Dawid Wysakowicz <
> wysakowicz.dawid@gmail.com> wrote:
>
>> Hi shashank,
>>
>> What version of flink are you using? Is it possible that you are hitting
>> this issue: https://issues.apache.org/jira/browse/FLINK-7563 ?
>>
>> Watermark semantics in CEP was buggy and events were processed only if
>> its timestamp was lower than current watermark while it should be lower or
>> equal.
>>
>> Best
>> Dawid
>>
>> > On 3 Jan 2018, at 17:05, shashank agarwal <shashank734@gmail.com>
>> wrote:
>> >
>> > ssed A with origTimestamp Y. (
>>
>>
>
>
> --
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....
>
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Mime
View raw message