Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AAB73200C83 for ; Sun, 28 May 2017 10:52:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A9352160BDB; Sun, 28 May 2017 08:52:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 87489160BB1 for ; Sun, 28 May 2017 10:51:59 +0200 (CEST) Received: (qmail 83053 invoked by uid 500); 28 May 2017 08:51:58 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 83015 invoked by uid 99); 28 May 2017 08:51:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 28 May 2017 08:51:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8F21DE00AF; Sun, 28 May 2017 08:51:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chesnay@apache.org To: commits@flink.apache.org Date: Sun, 28 May 2017 08:51:57 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] flink git commit: [FLINK-6137] Activate strict checkstyle for flink-cep archived-at: Sun, 28 May 2017 08:52:01 -0000 Repository: flink Updated Branches: refs/heads/master 4f50dc4df -> c9e574bf3 http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java new file mode 100644 index 0000000..7bf0767 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import com.google.common.primitives.Doubles; +import org.junit.Assert; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +/** + * Base method for IT tests of {@link NFA}. It provides utility methods. + */ +public class NFATestUtilities { + + public static List> feedNFA(List> inputEvents, NFA nfa) { + List> resultingPatterns = new ArrayList<>(); + + for (StreamRecord inputEvent : inputEvents) { + Collection>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map> p: patterns) { + List res = new ArrayList<>(); + for (List le: p.values()) { + res.addAll(le); + } + resultingPatterns.add(res); + } + } + return resultingPatterns; + } + + public static void compareMaps(List> actual, List> expected) { + Assert.assertEquals(expected.size(), actual.size()); + + for (List p: actual) { + Collections.sort(p, new EventComparator()); + } + + for (List p: expected) { + Collections.sort(p, new EventComparator()); + } + + Collections.sort(actual, new ListEventComparator()); + Collections.sort(expected, new ListEventComparator()); + Assert.assertArrayEquals(expected.toArray(), actual.toArray()); + } + + private static class ListEventComparator implements Comparator> { + + @Override + public int compare(List o1, List o2) { + int sizeComp = Integer.compare(o1.size(), o2.size()); + if (sizeComp == 0) { + EventComparator comp = new EventComparator(); + for (int i = 0; i < o1.size(); i++) { + int eventComp = comp.compare(o1.get(i), o2.get(i)); + if (eventComp != 0) { + return eventComp; + } + } + return 0; + } else { + return sizeComp; + } + } + } + + private static class EventComparator implements Comparator { + + @Override + public int compare(Event o1, Event o2) { + int nameComp = o1.getName().compareTo(o2.getName()); + int priceComp = Doubles.compare(o1.getPrice(), o2.getPrice()); + int idComp = Integer.compare(o1.getId(), o2.getId()); + if (nameComp == 0) { + if (priceComp == 0) { + return idComp; + } else { + return priceComp; + } + } else { + return nameComp; + } + } + } + + private NFATestUtilities() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java new file mode 100644 index 0000000..3b95eb4 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java @@ -0,0 +1,1036 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link Pattern#notFollowedBy(String)} and {@link Pattern#notNext(String)}. + */ +@SuppressWarnings("unchecked") +public class NotPatternITCase extends TestLogger { + + @Test + public void testNotNext() { + List> inputEvents = new ArrayList<>(); + + Event a1 = new Event(40, "a", 1.0); + Event c1 = new Event(41, "c", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event c2 = new Event(43, "c", 4.0); + Event d = new Event(43, "d", 4.0); + + inputEvents.add(new StreamRecord<>(a1, 1)); + inputEvents.add(new StreamRecord<>(c1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(c2, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5167288560432018992L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).notNext("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = 2242479288129905510L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 1404509325548220892L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = -8907427230007830915L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> matches = feedNFA(inputEvents, nfa); + + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(a1, c1, d), + Lists.newArrayList(a1, c2, d) + )); + } + + @Test + public void testNotNextNoMatches() { + List> inputEvents = new ArrayList<>(); + + Event a1 = new Event(40, "a", 1.0); + Event b1 = new Event(42, "b", 3.0); + Event c1 = new Event(41, "c", 2.0); + Event c2 = new Event(43, "c", 4.0); + Event d = new Event(43, "d", 4.0); + + inputEvents.add(new StreamRecord<>(a1, 1)); + inputEvents.add(new StreamRecord<>(b1, 2)); + inputEvents.add(new StreamRecord<>(c1, 3)); + inputEvents.add(new StreamRecord<>(c2, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = -339500190577666439L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).notNext("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = -6913980632538046451L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).followedBy("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 3332196998905139891L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 2086563479959018387L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> matches = feedNFA(inputEvents, nfa); + + assertEquals(0, matches.size()); + } + + @Test + public void testNotNextNoMatchesAtTheEnd() { + List> inputEvents = new ArrayList<>(); + + Event a1 = new Event(40, "a", 1.0); + Event c1 = new Event(41, "c", 2.0); + Event c2 = new Event(43, "c", 4.0); + Event d = new Event(43, "d", 4.0); + Event b1 = new Event(42, "b", 3.0); + + inputEvents.add(new StreamRecord<>(a1, 1)); + inputEvents.add(new StreamRecord<>(c1, 2)); + inputEvents.add(new StreamRecord<>(c2, 3)); + inputEvents.add(new StreamRecord<>(d, 4)); + inputEvents.add(new StreamRecord<>(b1, 5)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 1672995058886176627L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 6003621617520261554L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("end").where(new SimpleCondition() { + private static final long serialVersionUID = 887700237024758417L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }).notNext("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = 5239529076086933032L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> matches = feedNFA(inputEvents, nfa); + + assertEquals(0, matches.size()); + } + + @Test + public void testNotFollowedBy() { + List> inputEvents = new ArrayList<>(); + + Event a1 = new Event(40, "a", 1.0); + Event c1 = new Event(41, "c", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event c2 = new Event(43, "c", 4.0); + Event d = new Event(43, "d", 4.0); + + inputEvents.add(new StreamRecord<>(a1, 1)); + inputEvents.add(new StreamRecord<>(c1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(c2, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = -2641662468313191976L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).notFollowedBy("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = -3632144132379494778L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 3818766882138348167L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 2033204730795451288L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> matches = feedNFA(inputEvents, nfa); + + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(a1, c1, d) + )); + } + + @Test + public void testNotFollowedByBeforeOptional() { + List> inputEvents = new ArrayList<>(); + + Event a1 = new Event(40, "a", 1.0); + Event c1 = new Event(41, "c", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event c2 = new Event(43, "c", 4.0); + Event d = new Event(43, "d", 4.0); + + inputEvents.add(new StreamRecord<>(a1, 1)); + inputEvents.add(new StreamRecord<>(c1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(c2, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = -2454396370205097543L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).notFollowedBy("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = 2749547391611263290L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = -4989511337298217255L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).optional().followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = -8466223836652936608L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> matches = feedNFA(inputEvents, nfa); + + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(a1, c1, d) + )); + } + + @Test + public void testTimesWithNotFollowedBy() { + List> inputEvents = new ArrayList<>(); + + Event a1 = new Event(40, "a", 1.0); + Event b1 = new Event(41, "b", 2.0); + Event c = new Event(42, "c", 3.0); + Event b2 = new Event(43, "b", 4.0); + Event d = new Event(43, "d", 4.0); + + inputEvents.add(new StreamRecord<>(a1, 1)); + inputEvents.add(new StreamRecord<>(b1, 2)); + inputEvents.add(new StreamRecord<>(c, 3)); + inputEvents.add(new StreamRecord<>(b2, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = -2568839911852184515L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = -3632232424064269636L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).times(2).notFollowedBy("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = 3685596793523534611L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 1960758663575587243L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> matches = feedNFA(inputEvents, nfa); + + compareMaps(matches, Lists.>newArrayList()); + } + + @Test + public void testIgnoreStateOfTimesWithNotFollowedBy() { + List> inputEvents = new ArrayList<>(); + + Event a1 = new Event(40, "a", 1.0); + Event e = new Event(41, "e", 2.0); + Event c1 = new Event(42, "c", 3.0); + Event b1 = new Event(43, "b", 4.0); + Event c2 = new Event(44, "c", 5.0); + Event d1 = new Event(45, "d", 6.0); + Event d2 = new Event(46, "d", 7.0); + + inputEvents.add(new StreamRecord<>(a1, 1)); + inputEvents.add(new StreamRecord<>(d1, 2)); + inputEvents.add(new StreamRecord<>(e, 1)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(c1, 2)); + inputEvents.add(new StreamRecord<>(c2, 4)); + inputEvents.add(new StreamRecord<>(d2, 5)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 2814850350025111940L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).notFollowedBy("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = 4988756153568853834L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = -225909103322018778L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).times(2).optional().followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = -924294627956373696L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> matches = feedNFA(inputEvents, nfa); + + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(a1, d1) + )); + } + + @Test + public void testTimesWithNotFollowedByAfter() { + List> inputEvents = new ArrayList<>(); + + Event a1 = new Event(40, "a", 1.0); + Event e = new Event(41, "e", 2.0); + Event c1 = new Event(42, "c", 3.0); + Event b1 = new Event(43, "b", 4.0); + Event b2 = new Event(44, "b", 5.0); + Event d1 = new Event(46, "d", 7.0); + Event d2 = new Event(47, "d", 8.0); + + inputEvents.add(new StreamRecord<>(a1, 1)); + inputEvents.add(new StreamRecord<>(d1, 2)); + inputEvents.add(new StreamRecord<>(e, 1)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(b2, 3)); + inputEvents.add(new StreamRecord<>(c1, 2)); + inputEvents.add(new StreamRecord<>(d2, 5)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 6193105689601702341L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5195859580923169111L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).times(2).notFollowedBy("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = 4973027956103783831L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 2724622546678984894L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> matches = feedNFA(inputEvents, nfa); + + compareMaps(matches, Lists.>newArrayList()); + } + + @Test + public void testNotFollowedByBeforeOptionalAtTheEnd() { + List> inputEvents = new ArrayList<>(); + + Event a1 = new Event(40, "a", 1.0); + Event c1 = new Event(41, "c", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event c2 = new Event(43, "c", 4.0); + + inputEvents.add(new StreamRecord<>(a1, 1)); + inputEvents.add(new StreamRecord<>(c1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(c2, 4)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = -4289351792573443294L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).notFollowedBy("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = -4989574608417523507L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).followedByAny("end").where(new SimpleCondition() { + private static final long serialVersionUID = -5940131818629290579L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).optional(); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> matches = feedNFA(inputEvents, nfa); + + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(a1, c1), + Lists.newArrayList(a1) + )); + } + + @Test + public void testNotFollowedByBeforeOptionalTimes() { + List> inputEvents = new ArrayList<>(); + + Event a1 = new Event(40, "a", 1.0); + Event c1 = new Event(41, "c", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event c2 = new Event(43, "c", 4.0); + Event d = new Event(43, "d", 4.0); + + inputEvents.add(new StreamRecord<>(a1, 1)); + inputEvents.add(new StreamRecord<>(c1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(c2, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = -7885381452276160322L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).notFollowedBy("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = 3471511260235826653L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 9073793782452363833L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).times(2).optional().followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 7972902718259767076L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> matches = feedNFA(inputEvents, nfa); + + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(a1, c1, c2, d) + )); + } + + @Test + public void testNotFollowedByWithBranchingAtStart() { + List> inputEvents = new ArrayList<>(); + + Event a1 = new Event(40, "a", 1.0); + Event b1 = new Event(42, "b", 3.0); + Event c1 = new Event(41, "c", 2.0); + Event a2 = new Event(41, "a", 4.0); + Event c2 = new Event(43, "c", 5.0); + Event d = new Event(43, "d", 6.0); + + inputEvents.add(new StreamRecord<>(a1, 1)); + inputEvents.add(new StreamRecord<>(b1, 2)); + inputEvents.add(new StreamRecord<>(c1, 3)); + inputEvents.add(new StreamRecord<>(a2, 4)); + inputEvents.add(new StreamRecord<>(c2, 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = -7866220136345465444L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).notFollowedBy("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = 4957837489028234932L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).followedBy("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5569569968862808007L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = -8579678167937416269L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> matches = feedNFA(inputEvents, nfa); + + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(a2, c2, d) + )); + } + + private static class NotFollowByData { + static final Event A_1 = new Event(40, "a", 1.0); + static final Event B_1 = new Event(41, "b", 2.0); + static final Event B_2 = new Event(42, "b", 3.0); + static final Event B_3 = new Event(42, "b", 4.0); + static final Event C_1 = new Event(43, "c", 5.0); + static final Event B_4 = new Event(42, "b", 6.0); + static final Event B_5 = new Event(42, "b", 7.0); + static final Event B_6 = new Event(42, "b", 8.0); + static final Event D_1 = new Event(43, "d", 9.0); + + private NotFollowByData() { + } + } + + @Test + public void testNotNextAfterOneOrMoreSkipTillNext() { + final List> matches = testNotNextAfterOneOrMore(false); + assertEquals(0, matches.size()); + } + + @Test + public void testNotNextAfterOneOrMoreSkipTillAny() { + final List> matches = testNotNextAfterOneOrMore(true); + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_2, NotFollowByData.D_1) + )); + } + + private List> testNotNextAfterOneOrMore(boolean allMatches) { + List> inputEvents = new ArrayList<>(); + + int i = 0; + inputEvents.add(new StreamRecord<>(NotFollowByData.A_1, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.B_1, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.C_1, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.B_2, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.D_1, i++)); + + Pattern pattern = Pattern + .begin("a").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }); + + pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*")).where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).oneOrMore() + .notNext("not c").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }) + .followedBy("d").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + return feedNFA(inputEvents, nfa); + } + + @Test + public void testNotFollowedByNextAfterOneOrMoreEager() { + final List> matches = testNotFollowedByAfterOneOrMore(true, false); + assertEquals(0, matches.size()); + } + + @Test + public void testNotFollowedByAnyAfterOneOrMoreEager() { + final List> matches = testNotFollowedByAfterOneOrMore(true, true); + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_5, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_6, NotFollowByData.D_1) + )); + } + + @Test + public void testNotFollowedByNextAfterOneOrMoreCombinations() { + final List> matches = testNotFollowedByAfterOneOrMore(false, false); + assertEquals(0, matches.size()); + } + + @Test + public void testNotFollowedByAnyAfterOneOrMoreCombinations() { + final List> matches = testNotFollowedByAfterOneOrMore(false, true); + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_5, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_6, NotFollowByData.D_1) + )); + } + + private List> testNotFollowedByAfterOneOrMore(boolean eager, boolean allMatches) { + List> inputEvents = new ArrayList<>(); + + int i = 0; + inputEvents.add(new StreamRecord<>(NotFollowByData.A_1, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.B_1, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.B_2, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.B_3, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.C_1, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.B_4, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.B_5, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.B_6, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.D_1, i)); + + Pattern pattern = Pattern + .begin("a").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }); + + pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*")) + .where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + pattern = (eager ? pattern.oneOrMore() : pattern.oneOrMore().allowCombinations()) + .notFollowedBy("not c").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }) + .followedBy("d").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + return feedNFA(inputEvents, nfa); + } + + @Test + public void testNotFollowedByAnyBeforeOneOrMoreEager() { + final List> matches = testNotFollowedByBeforeOneOrMore(true, true); + + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1) + )); + } + + @Test + public void testNotFollowedByAnyBeforeOneOrMoreCombinations() { + final List> matches = testNotFollowedByBeforeOneOrMore(false, true); + + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1) + )); + } + + @Test + public void testNotFollowedByBeforeOneOrMoreEager() { + final List> matches = testNotFollowedByBeforeOneOrMore(true, false); + + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1) + )); + } + + @Test + public void testNotFollowedByBeforeOneOrMoreCombinations() { + final List> matches = testNotFollowedByBeforeOneOrMore(false, false); + + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1) + )); + } + + private List> testNotFollowedByBeforeOneOrMore(boolean eager, boolean allMatches) { + List> inputEvents = new ArrayList<>(); + + int i = 0; + inputEvents.add(new StreamRecord<>(NotFollowByData.A_1, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.B_1, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.C_1, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.B_4, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.B_5, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.B_6, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.D_1, i)); + + Pattern pattern = Pattern + .begin("a").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }) + .notFollowedBy("not c").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }); + + pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*")) + .where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).oneOrMore(); + + pattern = (eager ? pattern : pattern.allowCombinations()) + .followedBy("d").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + return feedNFA(inputEvents, nfa); + } + + @Test + public void testNotFollowedByBeforeZeroOrMoreEagerSkipTillNext() { + final List> matches = testNotFollowedByBeforeZeroOrMore(true, false); + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1) + )); + } + + @Test + public void testNotFollowedByBeforeZeroOrMoreCombinationsSkipTillNext() { + final List> matches = testNotFollowedByBeforeZeroOrMore(false, false); + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_6, NotFollowByData.D_1) + )); + } + + @Test + public void testNotFollowedByBeforeZeroOrMoreEagerSkipTillAny() { + final List> matches = testNotFollowedByBeforeZeroOrMore(true, true); + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1) + )); + } + + @Test + public void testNotFollowedByBeforeZeroOrMoreCombinationsSkipTillAny() { + final List> matches = testNotFollowedByBeforeZeroOrMore(false, true); + compareMaps(matches, Lists.>newArrayList( + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.D_1), + Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_6, NotFollowByData.D_1) + )); + } + + private List> testNotFollowedByBeforeZeroOrMore(boolean eager, boolean allMatches) { + List> inputEvents = new ArrayList<>(); + + int i = 0; + inputEvents.add(new StreamRecord<>(NotFollowByData.A_1, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.B_1, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.C_1, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.B_4, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.B_5, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.B_6, i++)); + inputEvents.add(new StreamRecord<>(NotFollowByData.D_1, i)); + + Pattern pattern = Pattern + .begin("a").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }) + .notFollowedBy("not c").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }); + + pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*")) + .where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).oneOrMore().optional(); + + pattern = (eager ? pattern : pattern.allowCombinations()) + .followedBy("d").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + return feedNFA(inputEvents, nfa); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java new file mode 100644 index 0000000..d378a74 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * Tests for handling Events that are equal in case of {@link Object#equals(Object)} and have same timestamps. + */ +@SuppressWarnings("unchecked") +public class SameElementITCase extends TestLogger { + + @Test + public void testEagerZeroOrMoreSameElement() { + List> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event middleEvent3 = new Event(43, "a", 4.0); + Event end1 = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5)); + inputEvents.add(new StreamRecord<>(middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(end1, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1), + Lists.newArrayList(startEvent, middleEvent1, end1), + Lists.newArrayList(startEvent, end1) + )); + } + + @Test + public void testZeroOrMoreSameElement() { + List> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent1a = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event middleEvent3 = new Event(43, "a", 4.0); + Event middleEvent3a = new Event(43, "a", 4.0); + Event end1 = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent1a, 3)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5)); + inputEvents.add(new StreamRecord<>(middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(middleEvent3a, 6)); + inputEvents.add(new StreamRecord<>(end1, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().allowCombinations().followedByAny("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3, middleEvent3a, end1), + + Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3a, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3, middleEvent3a, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent3a, end1), + Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3, middleEvent3a, end1), + + Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3a, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3a, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent3a, end1), + Lists.newArrayList(startEvent, middleEvent2, middleEvent3, middleEvent3a, end1), + Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3a, end1), + Lists.newArrayList(startEvent, middleEvent1a, middleEvent3, middleEvent3a, end1), + + Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent3a, end1), + Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent1a, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1a, middleEvent3a, end1), + Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent2, middleEvent3a, end1), + Lists.newArrayList(startEvent, middleEvent3, middleEvent3a, end1), + + Lists.newArrayList(startEvent, middleEvent1, end1), + Lists.newArrayList(startEvent, middleEvent1a, end1), + Lists.newArrayList(startEvent, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent3a, end1), + + Lists.newArrayList(startEvent, end1) + )); + } + + @Test + public void testSimplePatternWSameElement() throws Exception { + List> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event end1 = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(end1, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, end1), + Lists.newArrayList(startEvent, middleEvent1, end1) + )); + } + + @Test + public void testIterativeConditionWSameElement() throws Exception { + List> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent1a = new Event(41, "a", 2.0); + Event middleEvent1b = new Event(41, "a", 2.0); + final Event end = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent1a, 3)); + inputEvents.add(new StreamRecord<>(middleEvent1b, 3)); + inputEvents.add(new StreamRecord<>(end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().allowCombinations().followedBy("end").where(new IterativeCondition() { + + private static final long serialVersionUID = -5566639743229703237L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + double sum = 0.0; + for (Event event: ctx.getEventsForPattern("middle")) { + sum += event.getPrice(); + } + return Double.compare(sum, 4.0) == 0; + } + + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent1b), + Lists.newArrayList(startEvent, middleEvent1a, middleEvent1b, end) + )); + } + + @Test + public void testEndWLoopingWSameElement() throws Exception { + List> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent1a = new Event(41, "a", 2.0); + Event middleEvent1b = new Event(41, "a", 2.0); + final Event end = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent1a, 3)); + inputEvents.add(new StreamRecord<>(middleEvent1b, 3)); + inputEvents.add(new StreamRecord<>(end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional(); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent), + Lists.newArrayList(startEvent, middleEvent1), + Lists.newArrayList(startEvent, middleEvent1a), + Lists.newArrayList(startEvent, middleEvent1b), + Lists.newArrayList(startEvent, middleEvent1, middleEvent1a), + Lists.newArrayList(startEvent, middleEvent1a, middleEvent1b), + Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent1b) + )); + } + + @Test + public void testRepeatingPatternWSameElement() throws Exception { + List> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middle1Event1 = new Event(40, "a", 2.0); + Event middle1Event2 = new Event(40, "a", 3.0); + Event middle1Event3 = new Event(40, "a", 4.0); + Event middle2Event1 = new Event(40, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middle1Event1, 3)); + inputEvents.add(new StreamRecord<>(middle1Event1, 3)); + inputEvents.add(new StreamRecord<>(middle1Event2, 3)); + inputEvents.add(new StreamRecord<>(new Event(40, "d", 6.0), 5)); + inputEvents.add(new StreamRecord<>(middle2Event1, 6)); + inputEvents.add(new StreamRecord<>(middle1Event3, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().followedBy("middle2").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).optional().followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middle1Event1), + + Lists.newArrayList(startEvent, middle1Event1, middle1Event1), + Lists.newArrayList(startEvent, middle2Event1, middle1Event3), + + Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2), + Lists.newArrayList(startEvent, middle1Event1, middle2Event1, middle1Event3), + + Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle1Event3), + Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle2Event1, middle1Event3), + + Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle2Event1, middle1Event3) + )); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java index bd828b6..44033c1 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java @@ -18,20 +18,19 @@ package org.apache.flink.cep.nfa; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.cep.Event; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.TestLogger; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.util.Collection; import java.util.Collections; @@ -39,6 +38,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +/** + * Tests for {@link SharedBuffer}. + */ public class SharedBufferTest extends TestLogger { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java index 26b8ce9..cd12071 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java @@ -32,6 +32,7 @@ import org.apache.flink.cep.pattern.MalformedPatternException; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.util.TestLogger; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -45,6 +46,9 @@ import static com.google.common.collect.Sets.newHashSet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +/** + * Tests for {@link NFACompiler}. + */ public class NFACompilerTest extends TestLogger { private static final SimpleCondition startFilter = new SimpleCondition() { @@ -116,7 +120,7 @@ public class NFACompilerTest extends TestLogger { } /** - * Tests that the NFACompiler generates the correct NFA from a given Pattern + * Tests that the NFACompiler generates the correct NFA from a given Pattern. */ @Test public void testNFACompilerWithSimplePattern() { http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java index 0345192..f5a909b 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.cep.operator; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -33,6 +34,7 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OperatorSnapshotUtil; + import org.junit.Ignore; import org.junit.Test; @@ -463,7 +465,6 @@ public class CEPFrom12MigrationTest { } } - @Test public void testSinglePatternAfterMigration() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java index c92f772..69ba42f 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java @@ -35,6 +35,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + import org.junit.Test; import java.net.URL; @@ -45,6 +46,9 @@ import java.util.concurrent.ConcurrentLinkedQueue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +/** + * Tests for migration from 1.1.x to 1.3.x. + */ public class CEPMigration11to13Test { private static String getResourceFilename(String filename) { @@ -198,7 +202,7 @@ public class CEPMigration11to13Test { final Event startEvent = new Event(42, "start", 1.0); final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); - final Event endEvent= new Event(42, "end", 1.0); + final Event endEvent = new Event(42, "end", 1.0); // uncomment these lines for regenerating the snapshot on Flink 1.1 /* http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 38ad0f1..d83c191 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -18,7 +18,6 @@ package org.apache.flink.cep.operator; -import com.google.common.collect.Lists; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; @@ -42,13 +41,13 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.types.Either; import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import static org.junit.Assert.*; - import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -57,6 +56,12 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link KeyedCEPPatternOperator} and {@link TimeoutKeyedCEPPatternOperator}. + */ public class CEPOperatorTest extends TestLogger { @Rule @@ -269,7 +274,7 @@ public class CEPOperatorTest extends TestLogger { assertTrue(resultObject instanceof StreamRecord); StreamRecord>, Long>, Map>>> streamRecord = - (StreamRecord>,Long>,Map>>>) resultObject; + (StreamRecord>, Long>, Map>>>) resultObject; assertTrue(streamRecord.getValue() instanceof Either.Left); @@ -299,8 +304,8 @@ public class CEPOperatorTest extends TestLogger { SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0); SubEvent middleEvent3 = new SubEvent(42, "foo3", 1.0, 10.0); - Event endEvent1 = new Event(42, "end", 1.0); - Event endEvent2 = new Event(42, "end", 2.0); + Event endEvent1 = new Event(42, "end", 1.0); + Event endEvent2 = new Event(42, "end", 2.0); Event startEventK2 = new Event(43, "start", 1.0); @@ -493,8 +498,8 @@ public class CEPOperatorTest extends TestLogger { SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0); SubEvent middleEvent3 = new SubEvent(42, "foo3", 1.0, 10.0); - Event endEvent1 = new Event(42, "end", 1.0); - Event endEvent2 = new Event(42, "end", 2.0); + Event endEvent1 = new Event(42, "end", 1.0); + Event endEvent2 = new Event(42, "end", 2.0); Event startEventK2 = new Event(43, "start", 1.0); @@ -747,7 +752,6 @@ public class CEPOperatorTest extends TestLogger { Assert.assertArrayEquals(expected.toArray(), actual.toArray()); } - private class ListEventComparator implements Comparator> { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java index 86be09c..40514df 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java @@ -35,6 +35,7 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + import org.junit.Test; import java.util.List; @@ -44,6 +45,9 @@ import java.util.Queue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +/** + * Tests for rescaling of CEP operators. + */ public class CEPRescalingTest { @Test @@ -64,7 +68,7 @@ public class CEPRescalingTest { Event startEvent1 = new Event(7, "start", 1.0); SubEvent middleEvent1 = new SubEvent(7, "foo", 1.0, 10.0); - Event endEvent1= new Event(7, "end", 1.0); + Event endEvent1 = new Event(7, "end", 1.0); int keygroup = KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(startEvent1), maxParallelism); assertEquals(1, keygroup); http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java index b6fb484..e00384b 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java @@ -25,15 +25,23 @@ import org.apache.flink.cep.pattern.conditions.OrCondition; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.cep.pattern.conditions.SubtypeCondition; import org.apache.flink.util.TestLogger; + import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +/** + * Tests for constructing {@link Pattern}. + */ public class PatternTest extends TestLogger { + /** - * These test simply test that the pattern construction completes without failure + * These test simply test that the pattern construction completes without failure. */ - @Test public void testStrictContiguity() { Pattern pattern = Pattern.begin("start").next("next").next("end");