flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Generate Timestamps and emit Watermarks - unordered events - Kafka source
Date Fri, 21 Apr 2017 12:57:24 GMT
+Kostas and +Dawid

Could you please have a look? You two have worked in these parts most recently. I recall that
there were some problems when it comes to event time and out-of-order processing in CEP in
Flink 1.2

Best,
Aljoscha
> On 19. Apr 2017, at 15:28, Luis Lázaro <lalazaro@keedio.com> wrote:
> 
> 
> Hi everyone, 
> i am working on a use case  with CEP and Flink:
> 
> Flink 1.2
> Source is Kafka configured with one single partition.
> Data are syslog standard messages parsed as LogEntry (object with attributes like timestamp,
service, severity, etc)
> An event is a LogEntry.
> If two consecutives LogEntry with severity ERROR (3) and same service are matched in
10 minutes period, an ErrorAlert must be triggered.
> 
> 
> Allthough i cannot warrant the ascending order of events (LogEntry) when consuming from
kafka, i decided to try this implementation:
> Timestamps per Kafka partition <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition>
> 
> 
> //My events provide its own timestamps
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
> 
> //"Watermarks are generated inside the Kafka consumer, per Kafka partition":
> val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new FlinkKafkaConsumer08[LogEntry](
>       parameterTool.getRequired("topic"), new LogEntrySchema(parameterTool.getBoolean("parseBody",
true)),
>       parameterTool.getProperties)
> 
> //may not be ascending order
> val kafkaSourceAssignedTimesTamp = kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[LogEntry]
{
>       override def extractAscendingTimestamp(t: LogEntry): Long = {
>         ProcessorHelper.toTimestamp(t.timestamp).getTime
>       }
>     })
> 
> val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp)
> 
>  I implemented a pattern like:
> 
> myPattern = 
>  Pattern
>       .begin[LogEntry]("First Event")
>       .subtype(classOf[LogEntry])
>       .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
>       .next("Second Event")
>       .subtype(classOf[LogEntry])
>       .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
>       .within(Time.minutes(10))
>   }
> 
>   This pattern will trigger alert when two consecutives LogEntry with severity ERROR
and with same service (it will be generate alerts for each service individually)
> 
>   CEP.pattern(stream
> 	.keyBy(_.service),
>     	myPattern)
> 
> 
> An alert is made of two logEntry:
> 
> ErrorAlert:
> service_name-ERROR-timestamp first event
> service_name-ERROR-timestamp second event
> 
> I am getting alerts like this:
> 
> ErrorAlert:
> service_2-3-2017-04-19 06:57:49
> service_2-3-2017-04-19 07:02:23
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:32:37
> service_2-3-2017-04-19 07:34:06
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:25:04
> service_1-3-2017-04-19 07:29:39
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:29:39
> service_1-3-2017-04-19 07:30:37
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:49:27
> service_3-3-2017-04-19 06:59:10  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:50:06
> service_2-3-2017-04-19 06:54:48  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 06:54:48
> service_2-3-2017-04-19 06:55:03
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:21:11
> service_3-3-2017-04-19 07:24:52
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:30:05
> service_1-3-2017-04-19 07:31:33
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:08:31
> service_3-3-2017-04-19 07:17:42
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:02:30
> service_1-3-2017-04-19 07:06:58
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:03:50
> service_3-3-2017-04-19 07:11:48
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:19:04
> service_3-3-2017-04-19 07:21:25
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:33:13
> service_3-3-2017-04-19 07:38:47
> 
> 
> I also tried this approach:
> bounded out-of-orderness <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness>
> 
> kafkaSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0))
{
>       override def extractTimestamp(t: LogEntry): Long = {
>         ProcessorHelper.toTimestamp(t.timestamp).getTime
>       }
>     })
> 
> Time.seconds(0) —> if i set like this, do i prevent the events from being delivered
with delayed ?
> 
> But i get the same problem as decribed above:
> 
> ……
> ErrorAlert:
> service_3-3-2017-04-19 07:49:27
> service_3-3-2017-04-19 06:59:10  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:50:06
> service_2-3-2017-04-19 06:54:48  ---> ups!
> …...
> 
> Initially i thought my pattern was not correctly implemented but the problem seems to
be i am unable to assign timestamp and consequently emit watermark properly when events are
unordered.
> 
> Any sugestion is well apreciated, thanks in advance.
> 
> 
> Best regards, Luis
> 


Mime
View raw message