Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DCCEF200C6C for ; Fri, 31 Mar 2017 00:04:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DB67C160B98; Thu, 30 Mar 2017 22:04:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A4788160BB3 for ; Fri, 31 Mar 2017 00:04:16 +0200 (CEST) Received: (qmail 6725 invoked by uid 500); 30 Mar 2017 22:04:15 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 4520 invoked by uid 99); 30 Mar 2017 22:04:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Mar 2017 22:04:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B460DDFF7C; Thu, 30 Mar 2017 22:04:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Thu, 30 Mar 2017 22:04:44 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [33/50] [abbrv] flink git commit: [FLINK-6197] [cep] Add support for iterative conditions. archived-at: Thu, 30 Mar 2017 22:04:20 -0000 http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java new file mode 100644 index 0000000..6aaa4bb --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +/** + * A {@link IterativeCondition condition} which combines two conditions with a logical + * {@code OR} and returns {@code true} if at least one is {@code true}. + * + * @param Type of the element to filter + */ +public class OrCondition extends IterativeCondition { + + private static final long serialVersionUID = 2554610954278485106L; + + private final IterativeCondition left; + private final IterativeCondition right; + + public OrCondition(final IterativeCondition left, final IterativeCondition right) { + this.left = left; + this.right = right; + } + + @Override + public boolean filter(T value, Context ctx) throws Exception { + return left.filter(value, ctx) || right.filter(value, ctx); + } + + /** + * @return One of the {@link IterativeCondition conditions} combined in this condition. + */ + public IterativeCondition getLeft() { + return left; + } + + /** + * @return One of the {@link IterativeCondition conditions} combined in this condition. + */ + public IterativeCondition getRight() { + return right; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java new file mode 100644 index 0000000..9ca52c5 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * A user-defined condition that decides if an element should be accepted in the pattern or not. + * Accepting an element also signals a state transition for the corresponding {@link org.apache.flink.cep.nfa.NFA}. + * + *

Contrary to the {@link IterativeCondition}, conditions that extend this class do not have access to the + * previously accepted elements in the pattern. Conditions that extend this class are simple {@code filter(...)} + * functions that decide based on the properties of the element at hand. + */ +public abstract class SimpleCondition extends IterativeCondition implements FilterFunction { + + private static final long serialVersionUID = 4942618239408140245L; + + @Override + public boolean filter(T value, Context ctx) throws Exception { + return filter(value); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java new file mode 100644 index 0000000..91f6c21 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +/** + * A {@link IterativeCondition condition} which filters elements of the given type. + * An element is filtered out iff it is not assignable to the given subtype of {@code T}. + * + * @param Type of the elements to be filtered + */ +public class SubtypeCondition extends SimpleCondition { + private static final long serialVersionUID = -2990017519957561355L; + + /** The subtype to filter for. */ + private final Class subtype; + + public SubtypeCondition(final Class subtype) { + this.subtype = subtype; + } + + @Override + public boolean filter(T value) throws Exception { + return subtype.isAssignableFrom(value.getClass()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index 5887017..42117ee 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -18,11 +18,11 @@ package org.apache.flink.cep; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; @@ -81,7 +81,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { new Event(8, "end", 1.0) ); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { @@ -89,7 +89,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { } }) .followedBy("middle").subtype(SubEvent.class).where( - new FilterFunction() { + new SimpleCondition() { @Override public boolean filter(SubEvent value) throws Exception { @@ -97,7 +97,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { } } ) - .followedBy("end").where(new FilterFunction() { + .followedBy("end").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { @@ -156,7 +156,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { } }); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { @@ -164,7 +164,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { } }) .followedBy("middle").subtype(SubEvent.class).where( - new FilterFunction() { + new SimpleCondition() { @Override public boolean filter(SubEvent value) throws Exception { @@ -172,7 +172,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { } } ) - .followedBy("end").where(new FilterFunction() { + .followedBy("end").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { @@ -236,19 +236,19 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { } }); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).followedBy("end").where(new FilterFunction() { + }).followedBy("end").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { @@ -325,19 +325,19 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { } }); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).followedBy("end").where(new FilterFunction() { + }).followedBy("end").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { @@ -378,7 +378,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { Pattern, ?> pattern = Pattern.>begin("start") - .where(new FilterFunction>() { + .where(new SimpleCondition>() { @Override public boolean filter(Tuple2 rec) throws Exception { return rec.f1 == 1; @@ -456,19 +456,19 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { } }); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).followedBy("end").where(new FilterFunction() { + }).followedBy("end").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { @@ -524,26 +524,26 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { ); Pattern pattern = Pattern.begin("start") - .where(new FilterFunction() { + .where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } }) .followedBy("middle") - .where(new FilterFunction() { + .where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getPrice() == 2.0; } }) - .or(new FilterFunction() { + .or(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getPrice() == 5.0; } }) - .followedBy("end").where(new FilterFunction() { + .followedBy("end").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index 5b05f19..197767e 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -18,20 +18,26 @@ package org.apache.flink.cep.nfa; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.flink.api.common.functions.FilterFunction; +import com.google.common.primitives.Doubles; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.Event; import org.apache.flink.cep.SubEvent; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -39,7 +45,6 @@ import java.util.Map; import java.util.Set; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; public class NFAITCase extends TestLogger { @@ -58,23 +63,21 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord(new Event(43, "start", 1.0), 4)); inputEvents.add(new StreamRecord(endEvent, 5)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }) - .followedBy("middle").subtype(SubEvent.class).where(new FilterFunction() { - private static final long serialVersionUID = 6215754202506583964L; + }).followedBy("middle").subtype(SubEvent.class).where(new SimpleCondition() { + private static final long serialVersionUID = 6215754202506583964L; - @Override - public boolean filter(SubEvent value) throws Exception { - return value.getVolume() > 5.0; - } - }) - .followedBy("end").where(new FilterFunction() { + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getVolume() > 5.0; + } + }).followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = 7056763917392056548L; @Override @@ -113,14 +116,14 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(middleEvent1, 3)); inputEvents.add(new StreamRecord<>(end, 5)); - Pattern pattern = Pattern.begin("middle").where(new FilterFunction() { + Pattern pattern = Pattern.begin("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).next("end").where(new FilterFunction() { + }).next("end").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -163,14 +166,14 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(middleEvent2, 4)); inputEvents.add(new StreamRecord<>(end, 5)); - Pattern pattern = Pattern.begin("middle").where(new FilterFunction() { + Pattern pattern = Pattern.begin("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).next("end").where(new FilterFunction() { + }).next("end").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -217,21 +220,21 @@ public class NFAITCase extends TestLogger { events.add(new StreamRecord(new Event(6, "end", 1.0), 13)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 7907391379273505897L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = -3268741540234334074L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).followedBy("end").where(new FilterFunction() { + }).followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = -8995174172182138608L; @Override @@ -240,11 +243,12 @@ public class NFAITCase extends TestLogger { } }).within(Time.milliseconds(10)); - NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); for (StreamRecord event: events) { - Collection> patterns = nfa.process(event.getValue(), event.getTimestamp()).f0; + Collection> patterns = nfa.process( + event.getValue(), + event.getTimestamp()).f0; resultingPatterns.addAll(patterns); } @@ -269,7 +273,6 @@ public class NFAITCase extends TestLogger { Set, Long>> resultingTimeoutPatterns = new HashSet<>(); Set, Long>> expectedTimeoutPatterns = new HashSet<>(); - events.add(new StreamRecord(new Event(1, "start", 1.0), 1)); events.add(new StreamRecord(new Event(2, "start", 1.0), 2)); events.add(new StreamRecord(new Event(3, "middle", 1.0), 3)); @@ -296,21 +299,21 @@ public class NFAITCase extends TestLogger { expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern3, 11L)); expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern4, 13L)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 7907391379273505897L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = -3268741540234334074L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).followedBy("end").where(new FilterFunction() { + }).followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = -8995174172182138608L; @Override @@ -319,7 +322,6 @@ public class NFAITCase extends TestLogger { } }).within(Time.milliseconds(10)); - NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), true); for (StreamRecord event: events) { @@ -359,38 +361,35 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord(nextOne2, 7)); inputEvents.add(new StreamRecord(endEvent, 8)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }) - .followedBy("middle-first").subtype(SubEvent.class).where(new FilterFunction() { - private static final long serialVersionUID = 6215754202506583964L; + }).followedBy("middle-first").subtype(SubEvent.class).where(new SimpleCondition() { + private static final long serialVersionUID = 6215754202506583964L; - @Override - public boolean filter(SubEvent value) throws Exception { - return value.getVolume() > 5.0; - } - }) - .followedBy("middle-second").subtype(SubEvent.class).where(new FilterFunction() { - private static final long serialVersionUID = 6215754202506583964L; + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getVolume() > 5.0; + } + }).followedBy("middle-second").subtype(SubEvent.class).where(new SimpleCondition() { + private static final long serialVersionUID = 6215754202506583964L; - @Override - public boolean filter(SubEvent value) throws Exception { - return value.getName().equals("next-one"); - } - }) - .followedBy("end").where(new FilterFunction() { - private static final long serialVersionUID = 7056763917392056548L; + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getName().equals("next-one"); + } + }).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 7056763917392056548L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("end"); - } - }); + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); @@ -443,44 +442,42 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(end3, 8)); inputEvents.add(new StreamRecord<>(end4, 9)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(false).followedBy("end1").where(new FilterFunction() { + }).zeroOrMore(false).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } - }) - .followedBy("end2").where(new FilterFunction() { - private static final long serialVersionUID = 5726188262756267490L; + }).followedBy("end2").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("d"); - } - }) - .followedBy("end3").where(new FilterFunction() { - private static final long serialVersionUID = 5726188262756267490L; + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }).followedBy("end3").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("e"); - } - }); + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("e"); + } + }); NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); @@ -533,21 +530,21 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(middleEvent2, 4)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(false).followedBy("end1").where(new FilterFunction() { + }).zeroOrMore(false).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -597,21 +594,21 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(middleEvent3, 5)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(true).followedBy("end1").where(new FilterFunction() { + }).zeroOrMore(true).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -660,14 +657,14 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(middleEvent3, 5)); inputEvents.add(new StreamRecord<>(end, 6)); - Pattern pattern = Pattern.begin("middle").where(new FilterFunction() { + Pattern pattern = Pattern.begin("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore().followedBy("end").where(new FilterFunction() { + }).zeroOrMore().followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -720,28 +717,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(middleEvent3, 5)); inputEvents.add(new StreamRecord<>(end, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle-first").where(new FilterFunction() { + }).followedBy("middle-first").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(false).followedBy("middle-second").where(new FilterFunction() { + }).zeroOrMore(false).followedBy("middle-second").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("d"); } - }).zeroOrMore(false).followedBy("end").where(new FilterFunction() { + }).zeroOrMore(false).followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -799,35 +796,35 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(kleene2, 7)); inputEvents.add(new StreamRecord<>(end, 8)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("branching").where(new FilterFunction() { + }).followedBy("branching").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).followedBy("merging").where(new FilterFunction() { + }).followedBy("merging").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("f"); } - }).followedBy("kleene").where(new FilterFunction() { + }).followedBy("kleene").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("d"); } - }).zeroOrMore(false).followedBy("end").where(new FilterFunction() { + }).zeroOrMore(false).followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -881,14 +878,14 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(middleEvent3, 4)); inputEvents.add(new StreamRecord<>(end, 5)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("d"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -896,7 +893,7 @@ public class NFAITCase extends TestLogger { return value.getName().equals("a"); } }).zeroOrMore() - .next("end").where(new FilterFunction() { + .next("end").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -909,7 +906,6 @@ public class NFAITCase extends TestLogger { Set> resultingPatterns = new HashSet<>(); - for (StreamRecord inputEvent : inputEvents) { Collection> patterns = nfa.process( inputEvent.getValue(), @@ -937,29 +933,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(middleEvent2, 3)); inputEvents.add(new StreamRecord<>(end, 5)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("d"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(false) - .next("end").where(new FilterFunction() { - private static final long serialVersionUID = 5726188262756267490L; + }).zeroOrMore(false).next("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); @@ -998,21 +993,21 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(middleEvent2, 4)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).oneOrMore(false).followedBy("end1").where(new FilterFunction() { + }).oneOrMore(false).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1059,14 +1054,14 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(startEvent3, 5)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).oneOrMore(false).followedBy("end").where(new FilterFunction() { + }).oneOrMore(false).followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1120,21 +1115,21 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(middleEvent3, 5L)); inputEvents.add(new StreamRecord<>(endEvent, 6L)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 6215754202506583964L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).next("middle").where(new FilterFunction() { + }).next("middle").where(new SimpleCondition() { private static final long serialVersionUID = 6215754202506583964L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).zeroOrMore(false).followedBy("end").where(new FilterFunction() { + }).zeroOrMore(false).followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = 7056763917392056548L; @Override @@ -1181,21 +1176,21 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(middleEvent3, 5)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).oneOrMore(true).followedBy("end1").where(new FilterFunction() { + }).oneOrMore(true).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1240,21 +1235,21 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(middleEvent, 5)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).optional().followedBy("end1").where(new FilterFunction() { + }).optional().followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1303,21 +1298,21 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(middleEvent3, 4)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).next("middle").where(new FilterFunction() { + }).next("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).times(2).followedBy("end1").where(new FilterFunction() { + }).times(2).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1362,14 +1357,14 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(middleEvent3, 4)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("middle").where(new FilterFunction() { + Pattern pattern = Pattern.begin("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).times(2).followedBy("end1").where(new FilterFunction() { + }).times(2).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1411,14 +1406,14 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(startEvent, 1)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).optional().followedBy("end1").where(new FilterFunction() { + }).optional().followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1464,14 +1459,14 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(middleEvent2, 4)); inputEvents.add(new StreamRecord<>(middleEvent3, 5)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1525,7 +1520,7 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(end2, 6)); inputEvents.add(new StreamRecord<>(end3, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1571,14 +1566,14 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(startEvent, 1)); inputEvents.add(new StreamRecord<>(middleEvent1, 3)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1624,14 +1619,14 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(middleEvent2, 4)); inputEvents.add(new StreamRecord<>(middleEvent3, 5)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1676,21 +1671,21 @@ public class NFAITCase extends TestLogger { Event middleEvent3 = new Event(43, "a", 4.0); Event end1 = new Event(44, "b", 5.0); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).next("middle").where(new FilterFunction() { + }).next("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).times(2).followedBy("end1").where(new FilterFunction() { + }).times(2).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1719,21 +1714,21 @@ public class NFAITCase extends TestLogger { Event middleEvent = new Event(43, "a", 4.0); Event end1 = new Event(44, "b", 5.0); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).optional().followedBy("end1").where(new FilterFunction() { + }).optional().followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1761,21 +1756,21 @@ public class NFAITCase extends TestLogger { Event middleEvent2 = new Event(42, "a", 3.0); Event end1 = new Event(44, "b", 5.0); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).oneOrMore(false).followedBy("end1").where(new FilterFunction() { + }).oneOrMore(false).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1805,21 +1800,21 @@ public class NFAITCase extends TestLogger { Event middleEvent2 = new Event(42, "a", 3.0); Event end1 = new Event(44, "b", 5.0); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(false).followedBy("end1").where(new FilterFunction() { + }).zeroOrMore(false).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1841,4 +1836,459 @@ public class NFAITCase extends TestLogger { assertEquals(true, nfa.isEmpty()); } + + ////////////////////// Iterative BooleanConditions ///////////////////////// + + private final Event startEvent1 = new Event(40, "start", 1.0); + private final Event startEvent2 = new Event(40, "start", 2.0); + private final Event startEvent3 = new Event(40, "start", 3.0); + private final Event startEvent4 = new Event(40, "start", 4.0); + private final SubEvent middleEvent1 = new SubEvent(41, "foo1", 1.0, 10); + private final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10); + private final SubEvent middleEvent3 = new SubEvent(43, "foo3", 3.0, 10); + private final SubEvent middleEvent4 = new SubEvent(43, "foo4", 1.0, 10); + private final Event nextOne = new Event(44, "next-one", 1.0); + private final Event endEvent = new Event(46, "end", 1.0); + + @Test + public void testIterativeWithBranchingPatternEager() { + List> actual = testIterativeWithBranchingPattern(true); + + compareMaps(actual, + Lists.>newArrayList( + Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4), + Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1), + Lists.newArrayList(startEvent1, endEvent, middleEvent1), + Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4), + Lists.newArrayList(startEvent2, endEvent, middleEvent3) + ) + ); + } + + @Test + public void testIterativeWithBranchingPatternCombinations() { + List> actual = testIterativeWithBranchingPattern(false); + + compareMaps(actual, + Lists.>newArrayList( + Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4), + Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1), + Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent3), + Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent2), + Lists.newArrayList(startEvent1, endEvent, middleEvent3, middleEvent1), + Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4), + Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent1), + Lists.newArrayList(startEvent1, endEvent, middleEvent4), + Lists.newArrayList(startEvent1, endEvent, middleEvent1), + Lists.newArrayList(startEvent1, endEvent, middleEvent2), + Lists.newArrayList(startEvent1, endEvent, middleEvent3), + Lists.newArrayList(startEvent2, endEvent, middleEvent4), + Lists.newArrayList(startEvent2, endEvent, middleEvent3) + ) + ); + } + + private List> testIterativeWithBranchingPattern(boolean eager) { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(startEvent1, 1)); + inputEvents.add(new StreamRecord(middleEvent1, 2)); + inputEvents.add(new StreamRecord(middleEvent2, 3)); + inputEvents.add(new StreamRecord<>(startEvent2, 4)); + inputEvents.add(new StreamRecord(middleEvent3, 5)); + inputEvents.add(new StreamRecord(middleEvent4, 5)); + inputEvents.add(new StreamRecord<>(nextOne, 6)); + inputEvents.add(new StreamRecord<>(endEvent, 8)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).followedBy("middle").subtype(SubEvent.class).where(new IterativeCondition() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(SubEvent value, Context ctx) throws Exception { + if (!value.getName().startsWith("foo")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("middle")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } + }).oneOrMore(eager).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = new ArrayList<>(); + + for (StreamRecord inputEvent : inputEvents) { + Collection> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map p: patterns) { + resultingPatterns.add(new ArrayList<>(p.values())); + } + } + + return resultingPatterns; + } + + @Test + public void testIterativeWithLoopingStartingEager() { + List> actual = testIterativeWithLoopingStarting(true); + + compareMaps(actual, + Lists.>newArrayList( + Lists.newArrayList(startEvent1, startEvent2, endEvent), + Lists.newArrayList(startEvent1, endEvent), + Lists.newArrayList(startEvent2, endEvent), + Lists.newArrayList(startEvent3, endEvent), + Lists.newArrayList(endEvent) + ) + ); + } + + @Test + public void testIterativeWithLoopingStartingCombination() { + List> actual = testIterativeWithLoopingStarting(false); + + compareMaps(actual, + Lists.>newArrayList( + Lists.newArrayList(startEvent1, startEvent2, endEvent), + Lists.newArrayList(startEvent1, startEvent3, endEvent), + Lists.newArrayList(startEvent1, endEvent), + Lists.newArrayList(startEvent2, endEvent), + Lists.newArrayList(startEvent3, endEvent), + Lists.newArrayList(endEvent) + ) + ); + } + + private List> testIterativeWithLoopingStarting(boolean eager) { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(startEvent1, 1L)); + inputEvents.add(new StreamRecord<>(startEvent2, 2L)); + inputEvents.add(new StreamRecord<>(startEvent3, 3L)); + inputEvents.add(new StreamRecord<>(endEvent, 4L)); + + // for now, a pattern inherits its continuity property from the followedBy() or next(), and the default + // behavior (which is the one applied in the case that the pattern graph starts with such a pattern) + // of a looping pattern is with relaxed continuity (as in followedBy). + + Pattern pattern = Pattern.begin("start").where(new IterativeCondition() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + if (!value.getName().equals("start")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("start")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } + }).zeroOrMore(eager).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = new ArrayList<>(); + + for (StreamRecord inputEvent : inputEvents) { + Collection> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map p: patterns) { + resultingPatterns.add(new ArrayList<>(p.values())); + } + } + + return resultingPatterns; + } + + @Test + public void testIterativeWithPrevPatternDependency() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(startEvent1, 1L)); + inputEvents.add(new StreamRecord<>(startEvent2, 2L)); + inputEvents.add(new StreamRecord<>(endEvent, 4L)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).oneOrMore().followedBy("end").where(new IterativeCondition() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + if (!value.getName().equals("end")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("start")) { + sum += event.getPrice(); + } + return Double.compare(sum, 2.0) >= 0; + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = new ArrayList<>(); + + for (StreamRecord inputEvent : inputEvents) { + Collection> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map p: patterns) { + resultingPatterns.add(new ArrayList<>(p.values())); + } + } + + compareMaps(resultingPatterns, + Lists.>newArrayList( + Lists.newArrayList(startEvent1, startEvent2, endEvent), + Lists.newArrayList(startEvent2, endEvent) + ) + ); + } + + @Test + public void testIterativeWithABACPattern() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(startEvent1, 1L)); //1 + inputEvents.add(new StreamRecord(middleEvent1, 2L)); //1 + + inputEvents.add(new StreamRecord<>(startEvent2, 2L)); //2 + inputEvents.add(new StreamRecord<>(startEvent3, 2L)); //3 + inputEvents.add(new StreamRecord(middleEvent2, 2L)); //2 + + inputEvents.add(new StreamRecord<>(startEvent4, 2L)); //4 + inputEvents.add(new StreamRecord(middleEvent3, 2L)); //3 + inputEvents.add(new StreamRecord(middleEvent4, 2L)); //1 + inputEvents.add(new StreamRecord<>(endEvent, 4L)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).followedBy("middle1").subtype(SubEvent.class).where(new SimpleCondition() { + private static final long serialVersionUID = 2178338526904474690L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getName().startsWith("foo"); + } + }).followedBy("middle2").where(new IterativeCondition() { + private static final long serialVersionUID = -1223388426808292695L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + if (!value.getName().equals("start")) { + return false; + } + + double sum = 0.0; + for (Event e: ctx.getEventsForPattern("middle2")) { + sum += e.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) <= 0; + } + }).oneOrMore().followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 562590474115118323L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = new ArrayList<>(); + + for (StreamRecord inputEvent : inputEvents) { + Collection> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map p: patterns) { + resultingPatterns.add(new ArrayList<>(p.values())); + } + } + + compareMaps(resultingPatterns, + Lists.>newArrayList( + Lists.newArrayList(startEvent1, startEvent2, startEvent3, middleEvent1, endEvent), + Lists.newArrayList(startEvent1, middleEvent1, startEvent2, endEvent), + Lists.newArrayList(startEvent1, middleEvent2, startEvent4, endEvent), + Lists.newArrayList(startEvent2, middleEvent2, startEvent4, endEvent), + Lists.newArrayList(startEvent3, middleEvent2, startEvent4, endEvent) + ) + ); + } + + @Test + public void testIterativeWithPrevPatternDependencyAfterBranching() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(startEvent1, 1L)); + inputEvents.add(new StreamRecord<>(startEvent2, 2L)); + inputEvents.add(new StreamRecord(middleEvent1, 4L)); + inputEvents.add(new StreamRecord<>(startEvent3, 5L)); + inputEvents.add(new StreamRecord(middleEvent2, 6L)); + inputEvents.add(new StreamRecord<>(endEvent, 7L)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).oneOrMore().followedBy("middle1").subtype(SubEvent.class).where(new SimpleCondition() { + private static final long serialVersionUID = 2178338526904474690L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getName().startsWith("foo"); + } + }).followedBy("end").where(new IterativeCondition() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + if (!value.getName().equals("end")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("start")) { + sum += event.getPrice(); + } + return Double.compare(sum, 2.0) >= 0; + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = new ArrayList<>(); + + for (StreamRecord inputEvent : inputEvents) { + Collection> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map p: patterns) { + resultingPatterns.add(new ArrayList<>(p.values())); + } + } + + compareMaps(resultingPatterns, + Lists.>newArrayList( + Lists.newArrayList(startEvent1, startEvent2, middleEvent1, endEvent), + Lists.newArrayList(startEvent2, middleEvent1, endEvent), + Lists.newArrayList(startEvent1, startEvent2, middleEvent2, endEvent), + Lists.newArrayList(startEvent1, startEvent2, startEvent3, middleEvent2, endEvent), + Lists.newArrayList(startEvent2, startEvent3, middleEvent2, endEvent), + Lists.newArrayList(startEvent2, middleEvent2, endEvent), + Lists.newArrayList(startEvent3, middleEvent2, endEvent) + ) + ); + } + + private void compareMaps(List> actual, List> expected) { + Assert.assertEquals(expected.size(), actual.size()); + + for (List p: actual) { + Collections.sort(p, new EventComparator()); + } + + for (List p: expected) { + Collections.sort(p, new EventComparator()); + } + + Collections.sort(actual, new ListEventComparator()); + Collections.sort(expected, new ListEventComparator()); + Assert.assertArrayEquals(expected.toArray(), actual.toArray()); + } + + private class ListEventComparator implements Comparator> { + + @Override + public int compare(List o1, List o2) { + int sizeComp = Integer.compare(o1.size(), o2.size()); + if (sizeComp == 0) { + EventComparator comp = new EventComparator(); + for (int i = 0; i < o1.size(); i++) { + int eventComp = comp.compare(o1.get(i), o2.get(i)); + if (eventComp != 0) { + return eventComp; + } + } + return 0; + } else { + return sizeComp; + } + } + } + + private class EventComparator implements Comparator { + + @Override + public int compare(Event o1, Event o2) { + int nameComp = o1.getName().compareTo(o2.getName()); + int priceComp = Doubles.compare(o1.getPrice(), o2.getPrice()); + int idComp = Integer.compare(o1.getId(), o2.getId()); + if (nameComp == 0) { + if (priceComp == 0) { + return idComp; + } else { + return priceComp; + } + } else { + return nameComp; + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java index 40a0e7e..d2e392b 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java @@ -19,9 +19,9 @@ package org.apache.flink.cep.nfa; import org.apache.commons.io.output.ByteArrayOutputStream; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.cep.Event; -import org.apache.flink.cep.pattern.FilterFunctions; +import org.apache.flink.cep.pattern.conditions.BooleanConditions; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -58,7 +58,7 @@ public class NFATest extends TestLogger { startState.addTake( endState, - new FilterFunction() { + new SimpleCondition() { private static final long serialVersionUID = -4869589195918650396L; @Override @@ -68,7 +68,7 @@ public class NFATest extends TestLogger { }); endState.addTake( endingState, - new FilterFunction() { + new SimpleCondition() { private static final long serialVersionUID = 2979804163709590673L; @Override @@ -76,7 +76,7 @@ public class NFATest extends TestLogger { return value.getName().equals("end"); } }); - endState.addIgnore(FilterFunctions.trueFunction()); + endState.addIgnore(BooleanConditions.trueFunction()); nfa.addState(startState); nfa.addState(endState); @@ -241,7 +241,7 @@ public class NFATest extends TestLogger { startState.addTake( endState, - new FilterFunction() { + new SimpleCondition() { private static final long serialVersionUID = -4869589195918650396L; @Override @@ -251,7 +251,7 @@ public class NFATest extends TestLogger { }); endState.addTake( endingState, - new FilterFunction() { + new SimpleCondition() { private static final long serialVersionUID = 2979804163709590673L; @Override @@ -259,7 +259,7 @@ public class NFATest extends TestLogger { return value.getName().equals("end"); } }); - endState.addIgnore(FilterFunctions.trueFunction()); + endState.addIgnore(BooleanConditions.trueFunction()); nfa.addState(startState); nfa.addState(endState); @@ -268,7 +268,7 @@ public class NFATest extends TestLogger { return nfa; } - private static class NameFilter implements FilterFunction { + private static class NameFilter extends SimpleCondition { private static final long serialVersionUID = 7472112494752423802L; http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java index 25618d5..f0a25d2 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java @@ -84,11 +84,16 @@ public class SharedBufferTest extends TestLogger { sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, DeweyNumber.fromString("1.1.0")); Collection> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0")); - sharedBuffer.remove("b", events[7], timestamp); + sharedBuffer.release("b", events[7], timestamp); Collection> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0")); Collection> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("2.0.0")); Collection> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("1.0.0")); - sharedBuffer.remove("b", events[5], timestamp); + sharedBuffer.release("b", events[5], timestamp); + + assertEquals(1L, patterns3.size()); + assertEquals(0L, patterns4.size()); + assertEquals(1L, patterns1.size()); + assertEquals(1L, patterns2.size()); assertTrue(sharedBuffer.isEmpty()); assertTrue(patterns4.isEmpty()); http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java index 93d78cc..80b1bcb 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java @@ -20,7 +20,6 @@ package org.apache.flink.cep.nfa.compiler; import com.google.common.collect.Sets; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -32,6 +31,7 @@ import org.apache.flink.cep.nfa.StateTransition; import org.apache.flink.cep.nfa.StateTransitionAction; import org.apache.flink.cep.pattern.MalformedPatternException; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.util.TestLogger; import org.junit.Rule; import org.junit.Test; @@ -48,7 +48,7 @@ import static org.junit.Assert.assertTrue; public class NFACompilerTest extends TestLogger { - private static final FilterFunction startFilter = new FilterFunction() { + private static final SimpleCondition startFilter = new SimpleCondition() { private static final long serialVersionUID = 3314714776170474221L; @Override @@ -57,7 +57,7 @@ public class NFACompilerTest extends TestLogger { } }; - private static final FilterFunction endFilter = new FilterFunction() { + private static final SimpleCondition endFilter = new SimpleCondition() { private static final long serialVersionUID = 3990995859716364087L; @Override @@ -91,7 +91,7 @@ public class NFACompilerTest extends TestLogger { * A filter implementation to test invalid pattern specification with * duplicate pattern names. Check {@link #testNFACompilerUniquePatternName()}. */ - private static class TestFilter implements FilterFunction { + private static class TestFilter extends SimpleCondition { private static final long serialVersionUID = -3863103355752267133L; http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java index 5a3e623..b83eb3c 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java @@ -18,7 +18,6 @@ package org.apache.flink.cep.operator; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.ByteSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; @@ -29,6 +28,7 @@ import org.apache.flink.cep.SubEvent; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -239,7 +239,7 @@ public class CEPMigration11to13Test { } } - private static class StartFilter implements FilterFunction { + private static class StartFilter extends SimpleCondition { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -248,7 +248,7 @@ public class CEPMigration11to13Test { } } - private static class MiddleFilter implements FilterFunction { + private static class MiddleFilter extends SimpleCondition { private static final long serialVersionUID = 6215754202506583964L; @Override @@ -257,7 +257,7 @@ public class CEPMigration11to13Test { } } - private static class EndFilter implements FilterFunction { + private static class EndFilter extends SimpleCondition { private static final long serialVersionUID = 7056763917392056548L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java index 65fa733..f230bbc 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java @@ -17,7 +17,6 @@ */ package org.apache.flink.cep.operator; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; @@ -26,6 +25,7 @@ import org.apache.flink.cep.SubEvent; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; @@ -448,7 +448,7 @@ public class CEPMigration12to13Test { } } - private static class StartFilter implements FilterFunction { + private static class StartFilter extends SimpleCondition { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -457,7 +457,7 @@ public class CEPMigration12to13Test { } } - private static class MiddleFilter implements FilterFunction { + private static class MiddleFilter extends SimpleCondition { private static final long serialVersionUID = 6215754202506583964L; @Override @@ -466,7 +466,7 @@ public class CEPMigration12to13Test { } } - private static class EndFilter implements FilterFunction { + private static class EndFilter extends SimpleCondition { private static final long serialVersionUID = 7056763917392056548L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index a99db05..726c8b8 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -19,7 +19,6 @@ package org.apache.flink.cep.operator; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; @@ -30,6 +29,7 @@ import org.apache.flink.cep.SubEvent; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; @@ -434,7 +434,7 @@ public class CEPOperatorTest extends TestLogger { harness.close(); } - + private void verifyWatermark(Object outputObject, long timestamp) { assertTrue(outputObject instanceof Watermark); assertEquals(timestamp, ((Watermark) outputObject).getTimestamp()); @@ -512,7 +512,7 @@ public class CEPOperatorTest extends TestLogger { @Override public NFA createNFA() { - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -520,7 +520,7 @@ public class CEPOperatorTest extends TestLogger { return value.getName().equals("start"); } }) - .followedBy("middle").subtype(SubEvent.class).where(new FilterFunction() { + .followedBy("middle").subtype(SubEvent.class).where(new SimpleCondition() { private static final long serialVersionUID = 6215754202506583964L; @Override @@ -528,7 +528,7 @@ public class CEPOperatorTest extends TestLogger { return value.getVolume() > 5.0; } }) - .followedBy("end").where(new FilterFunction() { + .followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = 7056763917392056548L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java index 399662a..2c86648 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java @@ -19,7 +19,6 @@ package org.apache.flink.cep.operator; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.cep.Event; @@ -27,6 +26,7 @@ import org.apache.flink.cep.SubEvent; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; @@ -371,7 +371,7 @@ public class CEPRescalingTest { @Override public NFA createNFA() { - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -379,7 +379,7 @@ public class CEPRescalingTest { return value.getName().equals("start"); } }) - .followedBy("middle").subtype(SubEvent.class).where(new FilterFunction() { + .followedBy("middle").subtype(SubEvent.class).where(new SimpleCondition() { private static final long serialVersionUID = 6215754202506583964L; @Override @@ -387,7 +387,7 @@ public class CEPRescalingTest { return value.getVolume() > 5.0; } }) - .followedBy("end").where(new FilterFunction() { + .followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = 7056763917392056548L; @Override