Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 41ABF200498 for ; Tue, 29 Aug 2017 12:07:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3E51A163884; Tue, 29 Aug 2017 10:07:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3778E163874 for ; Tue, 29 Aug 2017 12:07:06 +0200 (CEST) Received: (qmail 69719 invoked by uid 500); 29 Aug 2017 10:07:04 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 69710 invoked by uid 99); 29 Aug 2017 10:07:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Aug 2017 10:07:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 3A3C81A49E8 for ; Tue, 29 Aug 2017 10:07:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id ljIFK5RQ2WJ3 for ; Tue, 29 Aug 2017 10:07:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 81DF25FCBA for ; Tue, 29 Aug 2017 10:07:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 81867E0059 for ; Tue, 29 Aug 2017 10:07:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 2176F23F0D for ; Tue, 29 Aug 2017 10:07:00 +0000 (UTC) Date: Tue, 29 Aug 2017 10:07:00 +0000 (UTC) From: "Paolo Rendano (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Tue, 29 Aug 2017 10:07:07 -0000 [ https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Paolo Rendano updated FLINK-7549: --------------------------------- Description: Hi all, I'm doing some stress test on my pattern using JMeter to populate source data on a rabbitmq queue. This queue contains status generated by different devices . In my test case I set to loop on a base of 1000 cycles, each one sending respectively the first and the second status that generate the event using flink CEP (status keyed by device). In my early tests I launched that but I noticed that I get only partial results in output (70/80% of the expected ones). Introducing a delay in jmeter plan between the sending of the two status solved the problem. The minimum delay (of course this is on my local machine, on other machines may vary) that make things work is 20/25 ms. My code is structured this way (the following is a semplification): {code:java} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setAutoWatermarkInterval(100L); // source definition DataStream dataStreamSource = env.addSource(new MYRMQAutoboundQueueSource<>(connectionConfig, conf.getSourceExchange(), conf.getSourceRoutingKey(), conf.getSourceQueueName(), true, new MyMessageWrapperSchema())) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) { private static final long serialVersionUID = -1L; @Override public long extractTimestamp(MyMessageWrapper element) { if (element.getData().get("stateTimestamp")==null) { throw new RuntimeException("Status Timestamp is null during time ordering for device [" + element.getData().get("deviceCode") + "]"); } return FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime(); } }) .name("MyIncomingStatus"); // PATTERN DEFINITION Pattern myPattern = Pattern .begin("start") .subtype(MyMessageWrapper.class) .where(whereEquals("st", "none")) .next("end") .subtype(MyMessageWrapper.class) .where(whereEquals("st","started")) .within(Time.minutes(3)); // CEP DEFINITION PatternStream myPatternStream = CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern); DataStream> outputStream = myPatternStream.flatSelect(patternFlatTimeoutFunction, patternFlatSelectFunction); // SINK DEFINITION outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent"); {code} digging and logging messages received by flink in "extractTimestamp", what happens is that with that so high rate of messages, source may receive messages with the same timestamp but with different deviceCode. Any idea? Thanks, regards Paolo was: Hi all, I'm doing some stress test on my pattern using JMeter to populate source data on a rabbitmq queue. This queue contains status generated by different devices . In my test case I set to loop on a base of 1000 cycles, each one sending respectively the first and the second status that generate the event using flink CEP (status keyed by device). In my early tests I launched that but I noticed that I get only partial results in output (70/80% of the expected ones). Introducing a delay in jmeter plan between the sending of the two status solved the problem. The minimum delay (of course this is on my local machine, on other machines may vary) that make things work is 20/25 ms. My code is structured this way (the following is a semplification): {code:java} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setAutoWatermarkInterval(100L); // source definition DataStream dataStreamSource = env.addSource(new MYRMQAutoboundQueueSource<>(connectionConfig, conf.getSourceExchange(), conf.getSourceRoutingKey(), conf.getSourceQueueName(), true, new MyMessageWrapperSchema())) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) { private static final long serialVersionUID = -1L; @Override public long extractTimestamp(MyMessageWrapper element) { if (element.getData().get("stateTimestamp")==null) { throw new RuntimeException("Status Timestamp is null during time ordering for device [" + element.getData().get("deviceCode") + "]"); } return FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime(); } }) .name("MyIncomingStatus"); // PATTERN DEFINITION Pattern myPattern = Pattern .begin("start") .subtype(MyMessageWrapper.class) .where(whereEquals("st", "none")) .next("end") .subtype(MyMessageWrapper.class) .where(whereEquals("st","started")) .within(Time.minutes(3)); // CEP DEFINITION PatternStream< MyMessageWrapper > myPatternStream = CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern); DataStream> outputStream = myPatternStream.flatSelect(patternFlatTimeoutFunction, patternFlatSelectFunction); // SINK DEFINITION outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent"); {code} digging and logging messages received by flink in "extractTimestamp", what happens is that with that so high rate of messages, source may receive messages with the same timestamp but with different deviceCode. Any idea? Thanks, regards Paolo > CEP - Pattern not discovered if source streaming is very fast > ------------------------------------------------------------- > > Key: FLINK-7549 > URL: https://issues.apache.org/jira/browse/FLINK-7549 > Project: Flink > Issue Type: Bug > Components: CEP > Affects Versions: 1.3.1, 1.3.2 > Reporter: Paolo Rendano > > Hi all, > I'm doing some stress test on my pattern using JMeter to populate source data on a rabbitmq queue. This queue contains status generated by different devices . In my test case I set to loop on a base of 1000 cycles, each one sending respectively the first and the second status that generate the event using flink CEP (status keyed by device). > In my early tests I launched that but I noticed that I get only partial results in output (70/80% of the expected ones). Introducing a delay in jmeter plan between the sending of the two status solved the problem. The minimum delay (of course this is on my local machine, on other machines may vary) that make things work is 20/25 ms. > My code is structured this way (the following is a semplification): > {code:java} > final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setAutoWatermarkInterval(100L); > // source definition > DataStream dataStreamSource = > env.addSource(new MYRMQAutoboundQueueSource<>(connectionConfig, > conf.getSourceExchange(), > conf.getSourceRoutingKey(), > conf.getSourceQueueName(), > true, > new MyMessageWrapperSchema())) > .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) { > private static final long serialVersionUID = -1L; > @Override > public long extractTimestamp(MyMessageWrapper element) { > if (element.getData().get("stateTimestamp")==null) { > throw new RuntimeException("Status Timestamp is null during time ordering for device [" + element.getData().get("deviceCode") + "]"); > } > return FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime(); > } > }) > .name("MyIncomingStatus"); > // PATTERN DEFINITION > Pattern myPattern = Pattern > .begin("start") > .subtype(MyMessageWrapper.class) > .where(whereEquals("st", "none")) > .next("end") > .subtype(MyMessageWrapper.class) > .where(whereEquals("st","started")) > .within(Time.minutes(3)); > // CEP DEFINITION > PatternStream myPatternStream = CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern); > DataStream> outputStream = myPatternStream.flatSelect(patternFlatTimeoutFunction, patternFlatSelectFunction); > // SINK DEFINITION > outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent"); > {code} > digging and logging messages received by flink in "extractTimestamp", what happens is that with that so high rate of messages, source may receive messages with the same timestamp but with different deviceCode. > Any idea? > Thanks, regards > Paolo -- This message was sent by Atlassian JIRA (v6.4.14#64029)