Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 226E8200C6A for ; Wed, 19 Apr 2017 15:28:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 20D90160B94; Wed, 19 Apr 2017 13:28:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E6830160B86 for ; Wed, 19 Apr 2017 15:28:29 +0200 (CEST) Received: (qmail 59822 invoked by uid 500); 19 Apr 2017 13:28:24 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 59812 invoked by uid 99); 19 Apr 2017 13:28:24 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Apr 2017 13:28:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 8BE2C1A0306 for ; Wed, 19 Apr 2017 13:28:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.98 X-Spam-Level: * X-Spam-Status: No, score=1.98 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=keedio-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id IR26rLLRWF5a for ; Wed, 19 Apr 2017 13:28:21 +0000 (UTC) Received: from mail-wm0-f48.google.com (mail-wm0-f48.google.com [74.125.82.48]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 9E1D25F39D for ; Wed, 19 Apr 2017 13:28:20 +0000 (UTC) Received: by mail-wm0-f48.google.com with SMTP id o81so80098786wmb.1 for ; Wed, 19 Apr 2017 06:28:20 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=keedio-com.20150623.gappssmtp.com; s=20150623; h=from:mime-version:subject:message-id:date:to; bh=hI9BJN7+UE2GE4+Yr8u2tvna+wzjQzbdXl0WnSE3Eqs=; b=xITLKu5/GIkrOmosiiANTiZJS2yMsIx0LxZbRmwjtIBdYN9ff9H0ta/72y0Wpdti7V z6zy+MeBA5b8nMZxEAWNlPZu+cPTIASskYS9iE7jBBEMHtIueXdeCBNCHJzGiltXygok K69kOR9cK9aDsk0muEvj4QL6Wvdw9lWuLYzWWBYuPz02Higry6layH7SbAoxTD+ruNX3 puXVJly8BAw3tRE60jww1/oBW2m9vayDymG/Cnz7TSoxxIzmAORWr6VJHVae7hLiqPLe 6392BOLFKGXktfqKqxmMrlesSRqSVVP1oDI86UpxRewA1wtXUcHCRJQkQ1YqAq0SjYFJ dBEg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:mime-version:subject:message-id:date:to; bh=hI9BJN7+UE2GE4+Yr8u2tvna+wzjQzbdXl0WnSE3Eqs=; b=Cv0Fs/jC/XnRbQLOhHHMOin2+FpV7udJ/4Ki51hA77twmkCAv+dbzQSGiFUz17w+9h kXS/4opNYt9lTbOY5Q2dVd4I+QB/nQOxBjkGEUtzv99p/PM6K5o3VBn9fZb5Sn2CdEgO gKtVOlzlBOBJNJgb/cPyHE0wYbonVnOWRc7MXz6HLHKpuKYBgtuaexXL+dM51kkwNvU3 o6LYImqPI0U7zv65af3hTkUYhctjkRsR8BGmEoCbSzeOkZlN6aVAVF6see6SCjAc6sAe 4zTsPBGK8WbP7F6r8e4vC1sI6t6hWMa4mncgbChz73KEk/WTAZ/0Icl26gfI+aaAS/TR y55w== X-Gm-Message-State: AN3rC/6h8d8iBQPL/rp2VFo4UxXpM816h4RifOCP1apE3E8oEuSAwgRB 89PUgXVjkFb9VUk6KAnUWg== X-Received: by 10.28.23.194 with SMTP id 185mr17623195wmx.52.1492608499853; Wed, 19 Apr 2017 06:28:19 -0700 (PDT) Received: from [192.168.2.20] (34.red-88-1-43.dynamicip.rima-tde.net. [88.1.43.34]) by smtp.gmail.com with ESMTPSA id q130sm19405588wmd.29.2017.04.19.06.28.19 for (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Wed, 19 Apr 2017 06:28:19 -0700 (PDT) From: =?utf-8?Q?Luis_L=C3=A1zaro?= Content-Type: multipart/alternative; boundary="Apple-Mail=_605745ED-39FE-4042-9DB1-E4E6EFB6547F" Mime-Version: 1.0 (Mac OS X Mail 10.3 \(3273\)) Subject: Generate Timestamps and emit Watermarks - unordered events - Kafka source Message-Id: <73F7DAE2-6F09-42BC-94B4-203A1441850F@keedio.com> Date: Wed, 19 Apr 2017 15:28:18 +0200 To: user@flink.apache.org X-Mailer: Apple Mail (2.3273) archived-at: Wed, 19 Apr 2017 13:28:31 -0000 --Apple-Mail=_605745ED-39FE-4042-9DB1-E4E6EFB6547F Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi everyone,=20 i am working on a use case with CEP and Flink: Flink 1.2 Source is Kafka configured with one single partition. Data are syslog standard messages parsed as LogEntry (object with = attributes like timestamp, service, severity, etc) An event is a LogEntry. If two consecutives LogEntry with severity ERROR (3) and same service = are matched in 10 minutes period, an ErrorAlert must be triggered. Allthough i cannot warrant the ascending order of events (LogEntry) when = consuming from kafka, i decided to try this implementation: Timestamps per Kafka partition = //My events provide its own timestamps env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)=20 //"Watermarks are generated inside the Kafka consumer, per Kafka = partition": val kafkaSource: FlinkKafkaConsumer08[LogEntry] =3D new = FlinkKafkaConsumer08[LogEntry]( parameterTool.getRequired("topic"), new = LogEntrySchema(parameterTool.getBoolean("parseBody", true)), parameterTool.getProperties) //may not be ascending order val kafkaSourceAssignedTimesTamp =3D = kafkaSource.assignTimestampsAndWatermarks(new = AscendingTimestampExtractor[LogEntry] { override def extractAscendingTimestamp(t: LogEntry): Long =3D { ProcessorHelper.toTimestamp(t.timestamp).getTime } }) val stream: DataStream[LogEntry] =3D = env.addSource(kafkaSourceAssignedTimesTamp) I implemented a pattern like: myPattern =3D=20 Pattern .begin[LogEntry]("First Event") .subtype(classOf[LogEntry]) .where(event =3D> event.severity =3D=3D = SyslogCode.numberOfSeverity("ERROR")) .next("Second Event") .subtype(classOf[LogEntry]) .where(event =3D> event.severity =3D=3D = SyslogCode.numberOfSeverity("ERROR")) .within(Time.minutes(10)) } This pattern will trigger alert when two consecutives LogEntry with = severity ERROR and with same service (it will be generate alerts for = each service individually) CEP.pattern(stream .keyBy(_.service), myPattern) An alert is made of two logEntry: ErrorAlert: service_name-ERROR-timestamp first event service_name-ERROR-timestamp second event I am getting alerts like this: ErrorAlert: service_2-3-2017-04-19 06:57:49 service_2-3-2017-04-19 07:02:23 ErrorAlert: service_2-3-2017-04-19 07:32:37 service_2-3-2017-04-19 07:34:06 ErrorAlert: service_1-3-2017-04-19 07:25:04 service_1-3-2017-04-19 07:29:39 ErrorAlert: service_1-3-2017-04-19 07:29:39 service_1-3-2017-04-19 07:30:37 ErrorAlert: service_3-3-2017-04-19 07:49:27 service_3-3-2017-04-19 06:59:10 ---> ups! ErrorAlert: service_2-3-2017-04-19 07:50:06 service_2-3-2017-04-19 06:54:48 ---> ups! ErrorAlert: service_2-3-2017-04-19 06:54:48 service_2-3-2017-04-19 06:55:03 ErrorAlert: service_3-3-2017-04-19 07:21:11 service_3-3-2017-04-19 07:24:52 ErrorAlert: service_1-3-2017-04-19 07:30:05 service_1-3-2017-04-19 07:31:33 ErrorAlert: service_3-3-2017-04-19 07:08:31 service_3-3-2017-04-19 07:17:42 ErrorAlert: service_1-3-2017-04-19 07:02:30 service_1-3-2017-04-19 07:06:58 ErrorAlert: service_3-3-2017-04-19 07:03:50 service_3-3-2017-04-19 07:11:48 ErrorAlert: service_3-3-2017-04-19 07:19:04 service_3-3-2017-04-19 07:21:25 ErrorAlert: service_3-3-2017-04-19 07:33:13 service_3-3-2017-04-19 07:38:47 I also tried this approach: bounded out-of-orderness = kafkaSource.assignTimestampsAndWatermarks(new = BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) { override def extractTimestamp(t: LogEntry): Long =3D { ProcessorHelper.toTimestamp(t.timestamp).getTime } }) Time.seconds(0) =E2=80=94> if i set like this, do i prevent the events = from being delivered with delayed ? But i get the same problem as decribed above: =E2=80=A6=E2=80=A6 ErrorAlert: service_3-3-2017-04-19 07:49:27 service_3-3-2017-04-19 06:59:10 ---> ups! ErrorAlert: service_2-3-2017-04-19 07:50:06 service_2-3-2017-04-19 06:54:48 ---> ups! =E2=80=A6... Initially i thought my pattern was not correctly implemented but the = problem seems to be i am unable to assign timestamp and consequently = emit watermark properly when events are unordered. Any sugestion is well apreciated, thanks in advance. Best regards, Luis --Apple-Mail=_605745ED-39FE-4042-9DB1-E4E6EFB6547F Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8
Hi everyone, 
i am working on a use = case  with CEP and Flink:

  • Flink 1.2
  • Source is Kafka configured with = one single partition.
  • Data are syslog standard = messages parsed as LogEntry (object with attributes like timestamp, = service, severity, etc)
  • An event is a = LogEntry.
  • If two consecutives LogEntry with severity = ERROR (3) and same service are matched in 10 minutes period, an = ErrorAlert must be triggered.


Allthough i cannot warrant the ascending order of events = (LogEntry) when consuming from kafka, i decided to try this = implementation:


//My events = provide its own timestamps
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)&n= bsp;

//"Watermarks are generated = inside the Kafka consumer, per Kafka partition":
val = kafkaSource: FlinkKafkaConsumer08[LogEntry] =3D new = FlinkKafkaConsumer08[LogEntry](
      = parameterTool.getRequired("topic"), new = LogEntrySchema(parameterTool.getBoolean("parseBody", = true)),
      = parameterTool.getProperties)

//may not be = ascending order
val kafkaSourceAssignedTimesTamp =3D = kafkaSource.assignTimestampsAndWatermarks(new = AscendingTimestampExtractor[LogEntry] {
      override = def extractAscendingTimestamp(t: LogEntry): Long =3D {
    =     = ProcessorHelper.toTimestamp(t.timestamp).getTime
    =   }
    })

val stream: = DataStream[LogEntry] =3D = env.addSource(kafkaSourceAssignedTimesTamp)

 I implemented a = pattern like:

myPattern = =3D 
 Pattern
      = .begin[LogEntry]("First Event")
      = .subtype(classOf[LogEntry])
      = .where(event =3D> event.severity =3D=3D = SyslogCode.numberOfSeverity("ERROR"))
      = .next("Second Event")
      = .subtype(classOf[LogEntry])
      = .where(event =3D> event.severity =3D=3D = SyslogCode.numberOfSeverity("ERROR"))
      = .within(Time.minutes(10))
  }

  This pattern will = trigger alert when two consecutives LogEntry with severity ERROR and = with same service (it will be generate alerts for each service = individually)

  CEP.pattern(stream
= .keyBy(_.service),
     = myPattern)


An alert is made of two = logEntry:

ErrorAlert:
service_name-ERROR-timestamp first event
service_name-ERROR-timestamp second event

I am getting alerts like = this:

ErrorAlert:
service_2-3-2017-04-19 06:57:49
service_2-3-2017-04-19 07:02:23

ErrorAlert:
service_2-3-2017-04-19 = 07:32:37
service_2-3-2017-04-19 07:34:06

ErrorAlert:
service_1-3-2017-04-19 = 07:25:04
service_1-3-2017-04-19 07:29:39

ErrorAlert:
service_1-3-2017-04-19 = 07:29:39
service_1-3-2017-04-19 07:30:37

ErrorAlert:
service_3-3-2017-04-19 = 07:49:27
service_3-3-2017-04-19 06:59:10  ---> = ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> = ups!

ErrorAlert:
service_2-3-2017-04-19 06:54:48
service_2-3-2017-04-19 06:55:03

ErrorAlert:
service_3-3-2017-04-19 = 07:21:11
service_3-3-2017-04-19 07:24:52

ErrorAlert:
service_1-3-2017-04-19 = 07:30:05
service_1-3-2017-04-19 07:31:33

ErrorAlert:
service_3-3-2017-04-19 = 07:08:31
service_3-3-2017-04-19 07:17:42

ErrorAlert:
service_1-3-2017-04-19 = 07:02:30
service_1-3-2017-04-19 07:06:58

ErrorAlert:
service_3-3-2017-04-19 = 07:03:50
service_3-3-2017-04-19 07:11:48

ErrorAlert:
service_3-3-2017-04-19 = 07:19:04
service_3-3-2017-04-19 07:21:25

ErrorAlert:
service_3-3-2017-04-19 = 07:33:13
service_3-3-2017-04-19 07:38:47

I = also tried this approach:

kafkaSource.assignTimestampsAndWatermarks(new = BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) = {
      override def extractTimestamp(t: = LogEntry): Long =3D {
        = ProcessorHelper.toTimestamp(t.timestamp).getTime
    =   }
    })

Time.seconds(0) =E2=80=94> if i set = like this, do i prevent the events from being delivered with delayed = ?

But i get = the same problem as decribed above:

=E2=80=A6=E2=80=A6
ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> = ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> = ups!
=E2=80=A6...

Initially i thought my pattern was not = correctly implemented but the problem seems to be i am unable to assign = timestamp and consequently emit watermark properly when events are = unordered.

Any = sugestion is well apreciated, thanks in advance.


Best = regards, Luis

= --Apple-Mail=_605745ED-39FE-4042-9DB1-E4E6EFB6547F--