flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Lucas Resch (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-9547) CEP pattern not called on windowed stream
Date Thu, 07 Jun 2018 18:07:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16505021#comment-16505021
] 

Lucas Resch edited comment on FLINK-9547 at 6/7/18 6:06 PM:
------------------------------------------------------------

[~dawidwys] I created a small example that does something similar. Somehow the behavior is
different though. Now it doesn't call the initial pattern but the one on the windowed stream
is called. Something is definitely wrong.

 
{code:java}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<Integer> objectDataStreamSource = env.fromElements(
        1, 2, 3, 4, 5, 6, 7, 8, 9, 10
);

SingleOutputStreamOperator<Long> forces = objectDataStreamSource
        .filter((FilterFunction<Integer>) Objects::nonNull)
        .process(new ProcessFunction<Integer, Long>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Long> out)
throws Exception {
                out.collect(value.longValue());
            }
        });

Pattern<Long, Long> forcesMock = Pattern.<Long>begin("start").where(new SimpleCondition<Long>()
{
    @Override
    public boolean filter(Long value) {
        return true;
    }
});

CEP.pattern(forces, forcesMock)
        .select(new PatternSelectFunction<Long, String>() {
            @Override
            public String select(Map<String, List<Long>> pattern) throws Exception
{
                return String.format("Prints %d as expected", pattern.get("start").get(0));
            }
        }).print();

// Create another stream based on a sliding window over the input stream
SingleOutputStreamOperator<Long> intervals = forces
        .countWindowAll(2, 1)
        .process(new ProcessAllWindowFunction<Long, Long, GlobalWindow>() {
            @Override
            public void process(Context context, Iterable<Long> elements, Collector<Long>
out) throws Exception {
                List<Long> items = new ArrayList<>();
                elements.forEach(items::add);
                if (items.size() == 2) {
                    out.collect(items.get(0));
                }
            }
        });

Pattern<Long, Long> intervalMock = Pattern.<Long>begin("start").where(new SimpleCondition<Long>()
{
    @Override
    public boolean filter(Long value) throws Exception {
        return true;
    }
});

CEP.pattern(intervals, intervalMock)
        .select(new PatternSelectFunction<Long, String>() {
            @Override
            public String select(Map<String, List<Long>> pattern) throws Exception
{
                return String.format("Doesn't print %d", pattern.get("start").get(0));
            }
        }).print();

env.execute();
{code}


was (Author: mlnotw):
I created a small example that does something similar. Somehow the behavior is different though.
Now it doesn't call the initial pattern but the one on the windowed stream is called. Something
is definitely wrong.

 
{code:java}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<Integer> objectDataStreamSource = env.fromElements(
        1, 2, 3, 4, 5, 6, 7, 8, 9, 10
);

SingleOutputStreamOperator<Long> forces = objectDataStreamSource
        .filter((FilterFunction<Integer>) Objects::nonNull)
        .process(new ProcessFunction<Integer, Long>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Long> out)
throws Exception {
                out.collect(value.longValue());
            }
        });

Pattern<Long, Long> forcesMock = Pattern.<Long>begin("start").where(new SimpleCondition<Long>()
{
    @Override
    public boolean filter(Long value) {
        return true;
    }
});

CEP.pattern(forces, forcesMock)
        .select(new PatternSelectFunction<Long, String>() {
            @Override
            public String select(Map<String, List<Long>> pattern) throws Exception
{
                return String.format("Prints %d as expected", pattern.get("start").get(0));
            }
        }).print();

// Create another stream based on a sliding window over the input stream
SingleOutputStreamOperator<Long> intervals = forces
        .countWindowAll(2, 1)
        .process(new ProcessAllWindowFunction<Long, Long, GlobalWindow>() {
            @Override
            public void process(Context context, Iterable<Long> elements, Collector<Long>
out) throws Exception {
                List<Long> items = new ArrayList<>();
                elements.forEach(items::add);
                if (items.size() == 2) {
                    out.collect(items.get(0));
                }
            }
        });

Pattern<Long, Long> intervalMock = Pattern.<Long>begin("start").where(new SimpleCondition<Long>()
{
    @Override
    public boolean filter(Long value) throws Exception {
        return true;
    }
});

CEP.pattern(intervals, intervalMock)
        .select(new PatternSelectFunction<Long, String>() {
            @Override
            public String select(Map<String, List<Long>> pattern) throws Exception
{
                return String.format("Doesn't print %d", pattern.get("start").get(0));
            }
        }).print();

env.execute();
{code}

> CEP pattern not called on windowed stream
> -----------------------------------------
>
>                 Key: FLINK-9547
>                 URL: https://issues.apache.org/jira/browse/FLINK-9547
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.2, 1.5.0
>            Reporter: Lucas Resch
>            Priority: Major
>
> When trying to match a pattern on a stream that was windowed the pattern will not be
called. The following shows example code where the issue was noticed:
> {code:java}
> // Set up stream
> SingleOutputStreamOperator<ForceZ> forces = ...
>         .filter(new FilterForcesFunction())
>         .process(new ProcessForcesFunction());
> // Define mock pattern
> Pattern<ForceZ, ?> forcesMock = Pattern.<ForceZ>begin("start").where(new
SimpleCondition<ForceZ>() {
>     @Override
>     public boolean filter(ForceZ value) {
>         // This is called as expected
>         return true;
>     }
> });
> // Print pattern results
> // This actually prints all incoming events as expected
> CEP.pattern(forcesMock, mock)
>         .select(new PatternSelectFunction<ForceZ, ForceZ>() {
>             @Override
>             public ForceZ select(Map<String, List<ForceZ>> pattern){
>                 return pattern.get("start").get(0);
>             }
>         }).print();
> // Create another stream based on a sliding window over the input stream
> SingleOutputStreamOperator<Interval> intervals = forces
>         .countWindowAll(2, 1)
>         .process(new ForceWindowFunction());
> // Define mock pattern
> Pattern<Interval, Interval> intervalMock = Pattern.<Interval>begin("start").where(new
SimpleCondition<Interval>() {
>     @Override
>     public boolean filter(Interval value) throws Exception {
>         // This is never called
>         return true;
>     }
> });
> // Print pattern results
> // Doesn't print anything since the mock condition is never called
> CEP.pattern(intervals, intervalMock)
>         .select(new PatternSelectFunction<Interval, Interval>() {
>             @Override
>             public Interval select(Map<String, List<Interval>> pattern) throws
Exception {
>                 return pattern.get("start").get(0);
>             }
>         }).print();
> {code}
> Either I'm doing something wrong or this is a major bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message