flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Luis Lázaro <lalaz...@keedio.com>
Subject Generate Timestamps and emit Watermarks - unordered events - Kafka source
Date Wed, 19 Apr 2017 13:28:18 GMT

Hi everyone, 
i am working on a use case  with CEP and Flink:

Flink 1.2
Source is Kafka configured with one single partition.
Data are syslog standard messages parsed as LogEntry (object with attributes like timestamp,
service, severity, etc)
An event is a LogEntry.
If two consecutives LogEntry with severity ERROR (3) and same service are matched in 10 minutes
period, an ErrorAlert must be triggered.


Allthough i cannot warrant the ascending order of events (LogEntry) when consuming from kafka,
i decided to try this implementation:
Timestamps per Kafka partition <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition>


//My events provide its own timestamps
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

//"Watermarks are generated inside the Kafka consumer, per Kafka partition":
val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new FlinkKafkaConsumer08[LogEntry](
      parameterTool.getRequired("topic"), new LogEntrySchema(parameterTool.getBoolean("parseBody",
true)),
      parameterTool.getProperties)

//may not be ascending order
val kafkaSourceAssignedTimesTamp = kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[LogEntry]
{
      override def extractAscendingTimestamp(t: LogEntry): Long = {
        ProcessorHelper.toTimestamp(t.timestamp).getTime
      }
    })

val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp)

 I implemented a pattern like:

myPattern = 
 Pattern
      .begin[LogEntry]("First Event")
      .subtype(classOf[LogEntry])
      .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
      .next("Second Event")
      .subtype(classOf[LogEntry])
      .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
      .within(Time.minutes(10))
  }

  This pattern will trigger alert when two consecutives LogEntry with severity ERROR and with
same service (it will be generate alerts for each service individually)

  CEP.pattern(stream
	.keyBy(_.service),
    	myPattern)


An alert is made of two logEntry:

ErrorAlert:
service_name-ERROR-timestamp first event
service_name-ERROR-timestamp second event

I am getting alerts like this:

ErrorAlert:
service_2-3-2017-04-19 06:57:49
service_2-3-2017-04-19 07:02:23

ErrorAlert:
service_2-3-2017-04-19 07:32:37
service_2-3-2017-04-19 07:34:06

ErrorAlert:
service_1-3-2017-04-19 07:25:04
service_1-3-2017-04-19 07:29:39

ErrorAlert:
service_1-3-2017-04-19 07:29:39
service_1-3-2017-04-19 07:30:37

ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 06:54:48
service_2-3-2017-04-19 06:55:03

ErrorAlert:
service_3-3-2017-04-19 07:21:11
service_3-3-2017-04-19 07:24:52

ErrorAlert:
service_1-3-2017-04-19 07:30:05
service_1-3-2017-04-19 07:31:33

ErrorAlert:
service_3-3-2017-04-19 07:08:31
service_3-3-2017-04-19 07:17:42

ErrorAlert:
service_1-3-2017-04-19 07:02:30
service_1-3-2017-04-19 07:06:58

ErrorAlert:
service_3-3-2017-04-19 07:03:50
service_3-3-2017-04-19 07:11:48

ErrorAlert:
service_3-3-2017-04-19 07:19:04
service_3-3-2017-04-19 07:21:25

ErrorAlert:
service_3-3-2017-04-19 07:33:13
service_3-3-2017-04-19 07:38:47


I also tried this approach:
bounded out-of-orderness <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness>

kafkaSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0))
{
      override def extractTimestamp(t: LogEntry): Long = {
        ProcessorHelper.toTimestamp(t.timestamp).getTime
      }
    })

Time.seconds(0) —> if i set like this, do i prevent the events from being delivered with
delayed ?

But i get the same problem as decribed above:

……
ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> ups!
…...

Initially i thought my pattern was not correctly implemented but the problem seems to be i
am unable to assign timestamp and consequently emit watermark properly when events are unordered.

Any sugestion is well apreciated, thanks in advance.


Best regards, Luis


Mime
View raw message