Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0A268200C7C for ; Mon, 5 Jun 2017 08:46:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 094C6160BD4; Mon, 5 Jun 2017 06:46:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0114A160BBF for ; Mon, 5 Jun 2017 08:46:28 +0200 (CEST) Received: (qmail 2753 invoked by uid 500); 5 Jun 2017 06:46:22 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 2743 invoked by uid 99); 5 Jun 2017 06:46:22 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Jun 2017 06:46:22 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 1CFDE180158 for ; Mon, 5 Jun 2017 06:46:22 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.203 X-Spam-Level: *** X-Spam-Status: No, score=3.203 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, HTML_OBFUSCATE_05_10=0.001, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, T_REMOTE_IMAGE=0.01, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id sRbkFn9vS7rk for ; Mon, 5 Jun 2017 06:46:19 +0000 (UTC) Received: from mail-ua0-f179.google.com (mail-ua0-f179.google.com [209.85.217.179]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 4AEF55FE5F for ; Mon, 5 Jun 2017 06:46:16 +0000 (UTC) Received: by mail-ua0-f179.google.com with SMTP id h39so16149270uaa.3 for ; Sun, 04 Jun 2017 23:46:16 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=YaiB8vDimKtWXoB1niGnnEMUAsxMlCddtfwLzahgz4I=; b=DXTWRHv1QLJonj2mA500OKqAjFjOGs6fxiRGODrlNkGrIHcXIDJkzxUjauk3sTfhSg Wg95VKpjbQUJhszSMkLEzC0wYYTv6V6NFIG1rQBZV4dt06L7N+JEc39n+iUqdSUfgm/C 8QdIynadsGotDXJscG076azzurwJj7Ep/bGwWU0VpoJLKWdiiyUiS9eEqpk0FXnuHHXK lC6vW/XBazOlUJEARC2Z4jVoSJZn6HWuEjB17Z2R5hDDiaAwt9WMxRWseWVdf5S5198b icTTX3mj7nqbhqvsuqRL6yzSl5z10kZqvOXei1YwMVT4X4E+QDU6YH59i/CBT3kpKfPA RXwQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=YaiB8vDimKtWXoB1niGnnEMUAsxMlCddtfwLzahgz4I=; b=Ny9XZ+ivx2Luez2pWFJexXK0iPjEsHsQTGcVBjE4y2uRvunJpm6RCCoqf7kJnXvQL9 MNMvpAjoFDe9uWbaD093IwPrRl5ThwsjJb0qUnDnPjpgNfeaG7A+P9N3YY6lMnEcioKE XZCL2Es1JiIovqlQgaCTXcICWeJVT0w+zF21EOJOY8Uf/M99kNLCIzPni9fdxYcT5w0s 0+MduZeGF5A4wkKu45r9Z8ZnlaapC6fXPofHXg1h7APrpMvaZZn+87LJwhCovWt/ABLl Ur6disNWa70lRr8Xbe/EIMTrMl7Zje/QoAW2MrJGfFCsO8Aj/dfhMQvXt8iB1wNYw5N+ FWFw== X-Gm-Message-State: AODbwcATIwMwA4KKrfjKdYi+dZKKJZmaBy794xzt5NCAde+csxJYcuOY eAyYJfjTcEckLlMqyEujIhi/ftDWgQ== X-Received: by 10.176.95.217 with SMTP id g25mr6249414uaj.71.1496645168808; Sun, 04 Jun 2017 23:46:08 -0700 (PDT) MIME-Version: 1.0 Received: by 10.159.62.152 with HTTP; Sun, 4 Jun 2017 23:46:08 -0700 (PDT) In-Reply-To: References: <1496401853975-13454.post@n4.nabble.com> From: Dawid Wysakowicz Date: Mon, 5 Jun 2017 08:46:08 +0200 Message-ID: Subject: Re: Queries regarding FlinkCEP To: Till Rohrmann Cc: Biplob Biswas , user Content-Type: multipart/alternative; boundary="089e08204960a4483d055130dbf1" archived-at: Mon, 05 Jun 2017 06:46:30 -0000 --089e08204960a4483d055130dbf1 Content-Type: text/plain; charset="UTF-8" I think Till answered all your question but just to rephrase a bit. 1. The within and TimeCharacteristic are working on different levels. The TimeCharacteristics tells how events are assigned a timestamp. The within operator specifies the maximal time between first and last event of a matched sequence (the time here corresponds to the chosen TimeCharacteristic). So if we have within(Time.minutes(10)) in EventTime, upon Watermark arrival the events are sorted with the assigned Timestamp and then the within is applied. 3. Looking at your code there is nothing wrong with it. As I don't know how the timestamps of your events looks like, I can just guess, but I would say either - there is no matching sequences of events in your stream that fit into 10 minutes window - or, your events are more mixed than across 60 seconds. Consider example: we have events with timestamps {t1=600s, t2=620, t3=550s}. Event with t3=550s cannot match with t1 because it lags 70s > 60s behind t2. FlinkCEP right now drops all late events. For deeper understanding of Event/Processing Time I would suggest having a look at : https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#event-time Z pozdrowieniami! / Cheers! Dawid Wysakowicz *Data/Software Engineer* Skype: dawid_wys | Twitter: @OneMoreCoder 2017-06-02 18:22 GMT+02:00 Till Rohrmann : > Hi Biplob, > > 1. The CEPPatternOperator can use either processing time or event time for > its internal processing logic. It only depends on what TimeCharacteristic > you have set for your program. Consequently, with event time, your example > should be detected as an alert. > > 2. If you don't provide a keyed input stream, then Flink will execute the > CEP operator only with a parallelism of 1. Thus, all events pass through > the same instance of the CEP operator. > > 3. It's hard to tell but I would assume that something with the watermark > generation does not properly work. For example, it could be that you've set > the out of orderness to a very large value such that it will take a long > time until you can be sure that you've seen all events for a given > watermark on the input without monotonically increasing timestamps. The > easiest way to debug the problem would be a self-contained example program > which reproduces the problem. > > Cheers, > Till > > On Fri, Jun 2, 2017 at 1:10 PM, Biplob Biswas > wrote: > >> Hi , >> >> Thanks a lot for the help last time, I have a few more questions and I >> chose >> to create a new topic as the problem in the previous topic was solved, >> thanks to useful inputs from Flink Community. The questions are as follows >> >> *1.* What time does the "within" operator works on "Event Time" or >> "Processing Time", I am asking this as I wanted to know whether something >> like the following would be captured or not. >> >> MaxOutofOrderness is set to 10 mins, and "within" operator is specified >> for >> 5 mins. So if a first events event time is at 1:00 and the corresponding >> next event is has an event time of 1:04 but it arrives in the system at >> 1:06. Would this still be processed and alert would be generated or not? >> >> *2.* What would happen if I don't have a key to specify, the way 2 events >> are correlated is by using the ctx of the first event and matching some >> different id. So, we can't group by some unique field. I tried a test run >> without specifying a key and it apparently works. But how is the shuffling >> done then in this case? >> >> *3.* This is one of the major issue, So I could use Event Time with >> ascending event time extractor for one of my kafka topic because its >> behavior is consistent. But when i added another topic to read from where >> the events are not in ascending order, using ascending timestampextractor >> gave me timestamp monotonicity violation. Then when I am using >> BoundedOutOfOrdernessTimestampExtractor for the same, I am not getting >> any >> warnings anymore but I am no more getting my alerts. >> >> If I go back to using processing time, then I am again getting alerts >> properly. What could be the problem here? >> >> *This is the code I am using:* >> >> /public class CEPForBAM { >> >> >> public static void main(String[] args) throws Exception { >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> System.out.println(env.getStreamTimeCharacteristic()); >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> env.getConfig().setAutoWatermarkInterval(10000); >> >> // configure Kafka consumer >> Properties props = new Properties(); >> props = getDefaultProperties(props); >> >> FlinkKafkaConsumer010 kafkaSource = new >> FlinkKafkaConsumer010<>( >> Arrays.asList("topic1", "topic_x", "topic_test"), >> new StringSerializerToEvent(), >> props); >> >> kafkaSource.assignTimestampsAndWatermarks(new >> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(60)) { >> >> private static final long serialVersionUID = -7228487240278428374L; >> >> @Override >> public long extractTimestamp(BAMEvent event) { >> return event.getTimestamp(); >> } >> }); >> >> DataStream events = env.addSource(kafkaSource); >> >> // Input stream of monitoring events >> >> >> /* DataStream partitionedInput = events >> .keyBy((KeySelector) BAMEvent::getId);*/ >> >> evetns.print(); >> //partitionedInput.print(); >> >> Pattern pattern = Pattern.begin("first") >> .where(new SimpleCondition() { >> private static final long serialVersionUID = >> 1390448281048961616L; >> >> @Override >> public boolean filter(BAMEvent event) throws Exception { >> return >> event.getEventName().equals(ReadEventType.class.getSimpleName()); >> } >> }) >> .followedBy("second") >> .where(new IterativeCondition() { >> private static final long serialVersionUID = >> -9216505110246259082L; >> >> @Override >> public boolean filter(BAMEvent secondEvent, >> Context >> ctx) throws Exception { >> >> if >> (secondEvent.getEventName().equals(StatusChangedEventType.cl >> ass.getSimpleName())) >> { >> for (BAMEvent firstEvent : >> ctx.getEventsForPattern("first")) { >> if >> (secondEvent.getCorrelationID().contains(firstEvent.getEventId())) >> return true; >> } >> } >> return false; >> } >> }) >> .within(Time.minutes(10)); >> >> PatternStream patternStream = CEP.pattern(events, pattern); >> >> >> DataStream> alerts = >> patternStream.select(new >> PatternTimeoutFunction() { >> private static final long serialVersionUID = -8717561187522704500L; >> >> @Override >> public String timeout(Map> map, long l) >> throws Exception { >> return "TimedOut: " + map.toString() + " @ " + l; >> } >> >> }, new PatternSelectFunction() { >> private static final long serialVersionUID = 3144439966791408980L; >> >> @Override >> public String select(Map> pattern) throws >> Exception { >> BAMEvent bamEvent = pattern.get("first").get(0); >> return "Matched Events: " + bamEvent.getEventId() + "_" + >> bamEvent.getEventName(); >> } >> }); >> >> alerts.print(); >> >> env.execute("CEP monitoring job"); >> } >> }/ >> >> >> Even when I am using Event Time, I am getting events from kafka as can be >> shown from event.print() >> >> >> >> -- >> View this message in context: http://apache-flink-user-maili >> ng-list-archive.2336050.n4.nabble.com/Queries-regarding- >> FlinkCEP-tp13454.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> > > --089e08204960a4483d055130dbf1 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
I think Till answered all your question but just to rephra= se a bit.=C2=A0

1. The within and TimeCharacteristic are= working on different levels. The TimeCharacteristics tells how events are = assigned a timestamp. The within operator specifies the maximal time betwee= n first and last event of a matched sequence (the time here corresponds to = the chosen TimeCharacteristic). So if we have within(Time.minutes(10)) in E= ventTime, upon Watermark arrival the events are sorted with the assigned Ti= mestamp and then the within is applied.

3. Looking= at your code there is nothing wrong with it. As I don't know how the t= imestamps of your events looks like, I can just guess, but I would say eith= er=C2=A0
  • there is no matching sequences of events in your= stream that fit into 10 minutes window=C2=A0
  • or, your events a= re more mixed than across 60 seconds. Consider example: we have events with= timestamps {t1=3D600s, t2=3D620, t3=3D550s}. Event with t3=3D550s cannot m= atch with t1 because it lags 70s > 60s behind t2. FlinkCEP right now dro= ps all late events.
For deeper understanding of Event/Process= ing Time I would suggest having a look at :=C2=A0=

2017-06-02 18:22 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:
Hi Biplob,

1. The CEPPatternOperator can = use either processing time or event time for its internal processing logic.= It only depends on what TimeCharacteristic you have set for your program. = Consequently, with event time, your example should be detected as an alert.=

2. If you don't provide a keyed input stream,= then Flink will execute the CEP operator only with a parallelism of 1. Thu= s, all events pass through the same instance of the CEP operator.

3. It's hard to tell but I would assume that something = with the watermark generation does not properly work. For example, it could= be that you've set the out of orderness to a very large value such tha= t it will take a long time until you can be sure that you've seen all e= vents for a given watermark on the input without monotonically increasing t= imestamps. The easiest way to debug the problem would be a self-contained e= xample program which reproduces the problem.

Cheer= s,
Till

On Fri, Jun 2, 2017 at = 1:10 PM, Biplob Biswas <revolutionisme@gmail.com> wro= te:
Hi ,

Thanks a lot for the help last time, I have a few more questions and I chos= e
to create a new topic as the problem in the previous topic was solved,
thanks to useful inputs from Flink Community. The questions are as follows<= br>
*1.* What time does the "within" operator works on "Event Ti= me" or
"Processing Time", I am asking this as I wanted to know whether s= omething
like the following would be captured or not.

MaxOutofOrderness is set to 10 mins, and "within" operator is spe= cified for
5 mins. So if a first events event time is at 1:00=C2=A0 and the correspond= ing
next event is has an event time of 1:04 but it arrives in the system at
1:06. Would this still be processed and alert would be generated or not?
*2.* What would happen if I don't have a key to specify, the way 2 even= ts
are correlated is by using the ctx of the first event and matching some
different id. So, we can't group by some unique field. I tried a test r= un
without specifying a key and it apparently works. But how is the shuffling<= br> done then in this case?

*3.* This is one of the major issue, So I could use Event Time with
ascending event time extractor for one of my kafka topic because its
behavior is consistent.=C2=A0 But when i added another topic to read from w= here
the events are not in ascending order, using ascending timestampextractor gave me timestamp monotonicity violation. Then when I am using
BoundedOutOfOrdernessTimestampExtractor for the same, I am not getting= any
warnings anymore but I am no more getting my alerts.

If I go back to using processing time, then I am again getting alerts
properly. What could be the problem here?

*This is the code I am using:*

/public class CEPForBAM {


=C2=A0 public static void main(String[] args) throws Exception {

=C2=A0 =C2=A0 StreamExecutionEnvironment env =3D
StreamExecutionEnvironment.getExecutionEnvironment();
=C2=A0 =C2=A0 System.out.println(env.getStreamTimeCharacteristic()); =C2=A0 =C2=A0 env.setStreamTimeCharacteristic(TimeCharacteristic.= EventTime);
=C2=A0 =C2=A0 env.getConfig().setAutoWatermarkInterval(10000);

// configure Kafka consumer
=C2=A0 =C2=A0 Properties props =3D new Properties();
=C2=A0 =C2=A0 props =3D getDefaultProperties(props);

=C2=A0 =C2=A0 FlinkKafkaConsumer010<BAMEvent> kafkaSource =3D ne= w
FlinkKafkaConsumer010<>(
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 Arrays.asList("topic1",= "topic_x", "topic_test"),
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 new StringSerializerToEvent(), =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 props);

=C2=A0 =C2=A0 kafkaSource.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<BAMEvent>(Time.seconds(60)) {

=C2=A0 =C2=A0 =C2=A0 private static final long serialVersionUID =3D -722848= 7240278428374L;

=C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 public long extractTimestamp(BAMEvent event) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 return event.getTimestamp();
=C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 });

=C2=A0 =C2=A0 DataStream<BAMEvent> events =3D env.addSource(kafkaSour= ce);

=C2=A0 =C2=A0 // Input stream of monitoring events


/*=C2=A0 =C2=A0 DataStream<BAMEvent> partitionedInput =3D events
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .keyBy((KeySelector<BAMEvent, = String>) BAMEvent::getId);*/

=C2=A0 =C2=A0 =C2=A0evetns.print();
=C2=A0 =C2=A0 //partitionedInput.print();

=C2=A0 =C2=A0 Pattern<BAMEvent, ?> pattern =3D Pattern.<BAMEvent&g= t;begin("first")
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .where(new SimpleCondition<BAM= Event>() {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 private static final long = serialVersionUID =3D
1390448281048961616L;

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 public boolean filter(BAME= vent event) throws Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return
event.getEventName().equals(ReadEventType.class.getSimpleName());=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 })
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .followedBy("second") =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .where(new IterativeCondition<= BAMEvent>() {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 private static final long = serialVersionUID =3D
-9216505110246259082L;

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 public boolean filter(BAME= vent secondEvent, Context<BAMEvent>
ctx) throws Exception {

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 if
(secondEvent.getEventName().equals(StatusChangedEventType.class.g= etSimpleName()))
{
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 for (BAMEven= t firstEvent :
ctx.getEventsForPattern("first")) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (secondEvent.getCorrelationID().contains(firstEvent.getEventId())= )
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 return true;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return false;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 })
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .within(Time.minutes(10));

=C2=A0 =C2=A0 PatternStream<BAMEvent> patternStream =3D CEP.pattern(e= vents, pattern);


=C2=A0 =C2=A0 DataStream<Either&lt;String, String>> alerts =3D= patternStream.select(new
PatternTimeoutFunction<BAMEvent, String>() {
=C2=A0 =C2=A0 =C2=A0 private static final long serialVersionUID =3D -871756= 1187522704500L;

=C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 public String timeout(Map<String, List&lt;BAMEv= ent>> map, long l)
throws Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 return "TimedOut: " + map.toString() = + " @ " + l;
=C2=A0 =C2=A0 =C2=A0 }

=C2=A0 =C2=A0 }, new PatternSelectFunction<BAMEvent, String>() {=
=C2=A0 =C2=A0 =C2=A0 private static final long serialVersionUID =3D 3144439= 966791408980L;

=C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 public String select(Map<String, List&lt;BAMEve= nt>> pattern) throws
Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 BAMEvent bamEvent =3D pattern.get("first&q= uot;).get(0);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 return "Matched Events: " + bamEvent.= getEventId() + "_" +
bamEvent.getEventName();
=C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 });

=C2=A0 =C2=A0 alerts.print();

=C2=A0 =C2=A0 env.execute("CEP monitoring job");
=C2=A0 }
}/


Even when I am using Event Time, I am getting events from kafka as can be shown from event.print()



--
View this message in context: http://apache-flink-user-mailing-= list-archive.2336050.n4.nabble.com/Queries-regarding-FlinkCEP-tp1= 3454.html
Sent from the Apache Flink User Mailing List archive. mailing list archive = at Nabble.com.


--089e08204960a4483d055130dbf1--