flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kostas Kloudas (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast
Date Thu, 05 Oct 2017 12:54:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16192823#comment-16192823
] 

Kostas Kloudas edited comment on FLINK-7549 at 10/5/17 12:53 PM:
-----------------------------------------------------------------

Hi [~info@paolorendano.it],

Is this issue still valid, or it is resolved by setting the {{timeCharacteristic}} to event
time?


was (Author: kkl0u):
Hi [~info@paolorendano.it],

Is this issue still valid, or it is resolved by setting the [[timeCharacteristic]] to event
time?

> CEP - Pattern not discovered if source streaming is very fast
> -------------------------------------------------------------
>
>                 Key: FLINK-7549
>                 URL: https://issues.apache.org/jira/browse/FLINK-7549
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.1, 1.3.2
>            Reporter: Paolo Rendano
>
> Hi all,
> I'm doing some stress test on my pattern using JMeter to populate source data on a rabbitmq
queue. This queue contains status generated by different devices . In my test case I set to
loop on a base of 1000 cycles, each one sending respectively the first and the second status
that generate the event using flink CEP (status keyed by device). I expect to get an output
of 1000 events.
> In my early tests I launched that but I noticed that I get only partial results in output
(70/80% of the expected ones). Introducing a delay in jmeter plan between the sending of the
two status solved the problem. The minimum delay (of course this is on my local machine, on
other machines may vary) that make things work is 20/25 ms.
> My code is structured this way (the following is a semplification):
> {code:java}
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setAutoWatermarkInterval(100L);
> // source definition
> DataStream<MyMessageWrapper> dataStreamSource =
>                     env.addSource(new MYRMQAutoboundQueueSource<>(connectionConfig,
>                             conf.getSourceExchange(),
>                             conf.getSourceRoutingKey(),
>                             conf.getSourceQueueName(),
>                             true,
>                             new MyMessageWrapperSchema()))
>                             .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyMessageWrapper>(Time.minutes(1))
{
>                                 private static final long serialVersionUID = -1L;
>                                 @Override
>                                 public long extractTimestamp(MyMessageWrapper element)
{
>                                     if (element.getData().get("stateTimestamp")==null)
{
>                                         throw new RuntimeException("Status Timestamp
is null during time ordering for device [" +  element.getData().get("deviceCode") + "]");
>                                     }
>                                     return FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
>                                 }
>                             })
>                             .name("MyIncomingStatus");
> // PATTERN  DEFINITION
> Pattern<MyMessageWrapper, ?> myPattern = Pattern
>                 .<MyMessageWrapper>begin("start")
>                 	.subtype(MyMessageWrapper.class)
>                 	.where(whereEquals("st", "none"))
>                 .next("end")
>                 	.subtype(MyMessageWrapper.class)
>                 	.where(whereEquals("st","started"))
>                 .within(Time.minutes(3));
> // CEP DEFINITION
> PatternStream<MyMessageWrapper> myPatternStream = CEP.pattern(dataStreamSource.keyBy(keySelector),
myPattern);
> DataStream<Either<TimeoutEvent, MyMessageWrapper >> outputStream = myPatternStream.flatSelect(patternFlatTimeoutFunction,
patternFlatSelectFunction);
> // SINK DEFINITION
> outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, outputExchange,
new MyMessageWrapperSchema())).name("MyGeneratedEvent");
> {code}
> digging and logging messages received by flink in "extractTimestamp", what happens is
that with that so high rate of messages, source may receive messages with the same timestamp
but with different deviceCode. 
> Any idea?
> Thanks, regards
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message