flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Lucas Resch (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9547) CEP pattern not called on windowed stream
Date Fri, 08 Jun 2018 07:47:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16505799#comment-16505799
] 

Lucas Resch commented on FLINK-9547:
------------------------------------

[~dawidwys] I tried both suggestions you made:
h5. Run with {{ProcessingTime}}

The issue is not existant as you expected.
h5. Implement Watermarking

In my initial code example and not the artificial one we are already using a custom Watermark
and Timestamp generator. It is defined like this:
{code:java}
public class CustomWatermarkEmitter<T> implements AssignerWithPunctuatedWatermarks<T>
{

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(T logEvent, long l) {
        return new Watermark(l);
    }

    @Override
    public long extractTimestamp(T logEvent, long l) {
        return ((RobotData) logEvent).getSendTimeStamp();
    }

}
{code}
Therefore I tried adding a simple Emitter for this artifical example to see if it fixes the
problem. The result was that it behaves the same as without. I defined it like this:
{code:java}
objectDataStreamSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>()
{
    
    @Override
    public Watermark checkAndGetNextWatermark(Integer integer, long l) {
        return new Watermark(l);
    }

    @Override
    public long extractTimestamp(Integer integer, long l) {
        return new Date().getTime();
    }

});
{code}

h5. Regarding emmitting enough events
I would expect that for a sliding window of 2 with a slide of 1 I would need at least 3 events
to close the window once. Is that a wrong assumption?

> CEP pattern not called on windowed stream
> -----------------------------------------
>
>                 Key: FLINK-9547
>                 URL: https://issues.apache.org/jira/browse/FLINK-9547
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.2, 1.5.0
>            Reporter: Lucas Resch
>            Priority: Major
>
> When trying to match a pattern on a stream that was windowed the pattern will not be
called. The following shows example code where the issue was noticed:
> {code:java}
> // Set up stream
> SingleOutputStreamOperator<ForceZ> forces = ...
>         .filter(new FilterForcesFunction())
>         .process(new ProcessForcesFunction());
> // Define mock pattern
> Pattern<ForceZ, ?> forcesMock = Pattern.<ForceZ>begin("start").where(new
SimpleCondition<ForceZ>() {
>     @Override
>     public boolean filter(ForceZ value) {
>         // This is called as expected
>         return true;
>     }
> });
> // Print pattern results
> // This actually prints all incoming events as expected
> CEP.pattern(forcesMock, mock)
>         .select(new PatternSelectFunction<ForceZ, ForceZ>() {
>             @Override
>             public ForceZ select(Map<String, List<ForceZ>> pattern){
>                 return pattern.get("start").get(0);
>             }
>         }).print();
> // Create another stream based on a sliding window over the input stream
> SingleOutputStreamOperator<Interval> intervals = forces
>         .countWindowAll(2, 1)
>         .process(new ForceWindowFunction());
> // Define mock pattern
> Pattern<Interval, Interval> intervalMock = Pattern.<Interval>begin("start").where(new
SimpleCondition<Interval>() {
>     @Override
>     public boolean filter(Interval value) throws Exception {
>         // This is never called
>         return true;
>     }
> });
> // Print pattern results
> // Doesn't print anything since the mock condition is never called
> CEP.pattern(intervals, intervalMock)
>         .select(new PatternSelectFunction<Interval, Interval>() {
>             @Override
>             public Interval select(Map<String, List<Interval>> pattern) throws
Exception {
>                 return pattern.get("start").get(0);
>             }
>         }).print();
> {code}
> Either I'm doing something wrong or this is a major bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message