Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 10E6C200CFC for ; Thu, 28 Sep 2017 17:18:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0F27E1609CD; Thu, 28 Sep 2017 15:18:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2BE171609C2 for ; Thu, 28 Sep 2017 17:18:07 +0200 (CEST) Received: (qmail 94329 invoked by uid 500); 28 Sep 2017 15:18:01 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 94313 invoked by uid 99); 28 Sep 2017 15:18:01 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Sep 2017 15:18:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id BE9D81A0CE7 for ; Thu, 28 Sep 2017 15:18:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.38 X-Spam-Level: ** X-Spam-Status: No, score=2.38 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id MJYTmkDqYm0K for ; Thu, 28 Sep 2017 15:17:58 +0000 (UTC) Received: from mail-oi0-f45.google.com (mail-oi0-f45.google.com [209.85.218.45]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 3B512610D0 for ; Thu, 28 Sep 2017 15:17:58 +0000 (UTC) Received: by mail-oi0-f45.google.com with SMTP id u130so2792847oib.11 for ; Thu, 28 Sep 2017 08:17:58 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=dNxEV1GV8JJZlaluLLLAFO3VAzZKa0w2386PmrglbwA=; b=bP35R6sW0GGsnROUj23j6iinWo9CrEQ4t4nBMPafQjog1XreckHFGDKbN48ueWi4IG UQOPkZbup7fNM71xsagvp8Kg2myw1DP9r2ETLib8qDF/xIo7PRbT5VZxMBSMn4UEyCNl TX6UOFRAtR/jZOBbdKvcdr7DtHP6pGdm77evkz92ObusATlK4+3VTVYXSh5zb1KpyE3p 1cEpEXmktgEyPQ3h4laOjY5aAXyzpoK/aUp2G8onFzpkGmxLcloQs/1BZ1dEQBhS2SZq 8HOKhjBDyo9HLdiN9hqeMcTP+LrhDlMtFbeUQ04+K98ThCR+j/uwFotklkBz/sFa58My Sv5Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=dNxEV1GV8JJZlaluLLLAFO3VAzZKa0w2386PmrglbwA=; b=fidZv1cntt1WY+cPG5n4PR4zXrwqWPKEdkQ61KhFo0paGC4DkxFMOIsJqIaeGQ20hN GoujuIHGfMk6nY94UUkcnZyPlfRjHSVghMWaO5T4U5sBUhs5NJOtM/kSWApTVYnp/51D uA9e0CFWlKCuzm/xJJJAuI4n705dlTJ35lTop+oOJQtMqqHa6Rm1y9v6gmMKNxdF89Nl /G294pFEMd2VSmXgkXGOKzMRwZEvMl6PK3WnSG7CWUzExl3oaxHCID4ZUjiA2K6u1Q5f ZZrWLjNqFvIxeqyxnnNtynPB1se1BqCRRp34qNI5MGC2nfaJHLglACo//zxyQepU166w s3Lg== X-Gm-Message-State: AMCzsaUhKpCmcl1+x+AUvCTpYpU+4P6klVpOIHD1jtiQVolQRBDuI8mz 1NZelk0LFyjvknGWjGsMRRgzxRvpdX0i3pCvGRk= X-Google-Smtp-Source: AOwi7QCwYh6YRp6HAZjiKMun4gXPaW0LuLCF+AGN+KE4wuNf1Id9ZCmbruuzHVzKVt824d1lUAiu6eDVHQmLa+G20Ps= X-Received: by 10.157.21.76 with SMTP id z12mr1255499otz.466.1506611877474; Thu, 28 Sep 2017 08:17:57 -0700 (PDT) MIME-Version: 1.0 Received: by 10.168.72.143 with HTTP; Thu, 28 Sep 2017 08:17:56 -0700 (PDT) In-Reply-To: References: From: Ajay Krishna Date: Thu, 28 Sep 2017 08:17:56 -0700 Message-ID: Subject: Re: Issue with CEP library To: Kostas Kloudas Cc: user@flink.apache.org Content-Type: multipart/alternative; boundary="94eb2c1907aac576d4055a41697e" archived-at: Thu, 28 Sep 2017 15:18:08 -0000 --94eb2c1907aac576d4055a41697e Content-Type: text/plain; charset="UTF-8" Hi Kostas, Thank you for reaching out and for the suggestions. Here are the results 1. Using an env parallelism of 1 performed similar with the additional problem that there was significant lag in the kafka topic 2. I removed the additional keyBy(0) but that did not change anything 3. I also tried only to check for the start only pattern and it was exactly the same where I saw one of the homes going through but 3 others just getting dropped. 4. I also tried slowing down the rate from 5000/second into Kafka to about 1000/second but I see similar results. I was wondering if you had any other solutions to the problem. I am specially concerned about 1 and 3. Is this library under active development ? Is there a JIRA open on this issue and could be open one to track this ? I was trying read on Stackoverlfow and found a user had a very very similar issue in Aug'16. So I also contacted him to discuss the issue and learn't that the pattern of failure was exactly the same. https://stackoverflow.com/questions/38870819/flink-cep-is-not-deterministic Before I found the above post, I created a post for this issue https://stackoverflow.com/questions/46458873/flink-cep-not-recognizing-pattern I would really appreciate your guidance on this. Best regards, Ajay On Thu, Sep 28, 2017 at 1:38 AM, Kostas Kloudas wrote: > Hi Ajay, > > I will look a bit more on the issue. > > But in the meantime, could you run your job with parallelism of 1, to see > if the results are the expected? > > Also could you change the pattern, for example check only for the start, > to see if all keys pass through. > > As for the code, you apply keyBy(0) the cepMap stream twice, which is > redundant and introduces latency. > You could remove that to also see the impact. > > Kostas > > On Sep 28, 2017, at 2:57 AM, Ajay Krishna wrote: > > Hi, > > I've been only working with flink for the past 2 weeks on a project and am > trying using the CEP library on sensor data. I am using flink version > 1.3.2. Flink has a kafka source. I am using KafkaSource9. I am running > Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu 16.04. From the > flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots > > What I observe is the following. The input to Kafka is a json string and > when parsed on the flink side, it looks like this > > (101,Sun Sep 24 23:18:53 UTC 2017,complex event,High,37.75142,-122.39458,12.0,20.0) > > I use a Tuple8 to capture the parsed data. The first field is home_id. The > time characteristic is set to EventTime and I have an > AscendingTimestampExtractor using the timestamp field. I have parallelism > for the execution environment is set to 4. I have a rather simple event > that I am trying to capture > > DataStream> cepMapByHomeId = cepMap.keyBy(0); > > //cepMapByHomeId.print(); > > Pattern, ?> cep1 = > Pattern.>begin("start") > .where(new OverLowThreshold()) > .followedBy("end") > .where(new OverHighThreshold()); > > > PatternStream> patternStream = CEP.pattern(cepMapByHomeId.keyBy(0), cep1); > > > DataStream> alerts = patternStream.select(new PackageCapturedEvents()); > > The pattern checks if the 7th field in the tuple8 goes over 12 and then > over 16. The output of the pattern is like this > > (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex event,Non-event,37.75837,-122.41467) > > On the Kafka producer side, I am trying send simulated data for around 100 > homes, so the home_id would go from 0-100 and the input is keyed by > home_id. I have about 10 partitions in kafka. The producer just loops going > through a csv file with a delay of about 100 ms between 2 rows of the csv > file. The data is exactly the same for all 100 of the csv files except for > home_id and the lat & long information. The timestamp is incremented by a > step of 1 sec. I start multiple processes to simulate data form different > homes. > > THE PROBLEM: > > Flink completely misses capturing events for a large subset of the input > data. I barely see the events for about 4-5 of the home_id values. I do a > print before applying the pattern and after and I see all home_ids before > and only a tiny subset after. Since the data is exactly the same, I expect > all homeid to be captured and written to my sink which is cassandra in this > case. I've looked through all available docs and examples but cannot seem > to get a fix for the problem. > > I would really appreciate some guidance how to understand fix this. > > > Thank you, > > Ajay > > > --94eb2c1907aac576d4055a41697e Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Kostas,

Thank you for reaching out a= nd for the suggestions. Here are the results

1. Us= ing an env parallelism of 1 performed similar with the additional problem t= hat there was significant lag in the kafka topic
2. I removed the= additional keyBy(0) but that did not change anything
3. I also t= ried only to check for the start only pattern and it was exactly the same w= here I saw one of the homes going through but 3 others just getting dropped= .=C2=A0
4. I also tried slowing down the rate from 5000/second in= to Kafka to about 1000/second but I see similar results.=C2=A0

I was wondering if you had any other solutions to the prob= lem. I am specially concerned about 1 and 3. Is this library under active d= evelopment ? Is there a JIRA open on this issue and could be open one to tr= ack this ?=C2=A0


I was trying read = on Stackoverlfow and found a user had a very very similar issue in Aug'= 16. So I also contacted him to discuss the issue and learn't that the p= attern of failure was exactly the same.=C2=A0



=
Before I found the above post, I created a post for this issue



I would really appreciate your guidance on this.=C2=A0

Best regards,
Ajay




<= br>
On Thu, Sep 28, 2017 at 1:38 AM, Kostas Kloud= as <k.kloudas@data-artisans.com> wrote:
Hi Ajay,
I will look a bit more on the issue.

= But in the meantime, could you run your job with parallelism of 1, to see i= f the results are the expected?

Also could you cha= nge the pattern, for example check only for the start, to see if all keys p= ass through.

As for the code, you apply keyBy(0) t= he cepMap stream twice, which is redundant and introduces latency.=C2=A0
You could remove that to also see the impact.

Kostas

On Sep 28, 2017, at 2:57 AM, Ajay Krishna <ajaykrishna@gmail.com> wrote:
Hi,=C2=A0

I've been only working with flink for the past 2 week= s on a project=20 and am trying using the CEP library on sensor data. I am using flink=20 version 1.3.2. Flink has a kafka source. I am using KafkaSource9. I am=20 running Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu=20 16.04. From the flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots

What I observe is the following. The input to Kafka is a = json string and when parsed on the flink side, it looks like this

(101,Sun Sep 24 23:18:53 UTC 2017,complex event,High,37.75142,-1=
22.39458,12.0,20.0)

I use a Tuple8 to capture the parsed data. The first field = is=20 home_id. The time characteristic is set to EventTime and I have an=20 AscendingTimestampExtractor using the timestamp field. I have=20 parallelism for the execution environment is set to 4. I have a rather=20 simple event that I am trying to capture

DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float, Float>> cepMapByHomeId =3D cepMap.keyBy(0);

            //cepMapByHomeId.print();

            Pattern<Tuple8<Integer,Date,String,String,Float,Floa=
t,Float,Float>, ?> cep1 =3D
                            Pattern.<Tuple8<Integer,Date,String,=
String,Float,Float,Float,Float>>begin("start")
                                            .where(new OverLowThreshold())
                                            .followedBy("end")
                                            .where(new OverHighThreshold())=
;


            PatternStream<Tuple8<Integer, Date, String, String, Float=
, Float, Float, Float>> patternStream =3D CEP.pattern(cepMapByHomeId.=
keyBy(0), cep1);


            DataStream<Tuple7<Integer, Date, Date, String, String, Fl=
oat, Float>> alerts =3D patternStream.select(new PackageCapturedEvent=
s());

The pattern checks if the 7th field in the tuple8 goes over= 12 and then over 16. The output of the pattern is like this

(201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,c=
omplex event,Non-event,37.75837,-122.41467)

On the Kafka producer side, I am trying send simulated data= for=20 around 100 homes, so the home_id would go from 0-100 and the input is=20 keyed by home_id. I have about 10 partitions in kafka. The producer just loops going through a csv file with a delay of about 100 ms between 2=20 rows of the csv file. The data is exactly the same for all 100 of the=20 csv files except for home_id and the lat & long information. The=20 timestamp is incremented by a step of 1 sec. I start multiple processes=20 to simulate data form different homes.

THE PROBLEM:

Flink compl= etely misses capturing events for a large subset of the=20 input data. I barely see the events for about 4-5 of the home_id values. I do a print before applying the pattern and after and I see all=20 home_ids before and only a tiny subset after. Since the data is exactly=20 the same, I expect all homeid to be captured and written to my sink=20 which is cassandra in this case. I've looked through all available docs= =20 and examples but cannot seem to get a fix for the problem.

I would r= eally appreciate some guidance how to understand fix this.


Thank you,

Ajay


--94eb2c1907aac576d4055a41697e--