Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4A63B200C87 for ; Wed, 17 May 2017 14:42:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 48B55160BB5; Wed, 17 May 2017 12:42:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4A40B160BBA for ; Wed, 17 May 2017 14:42:42 +0200 (CEST) Received: (qmail 42579 invoked by uid 500); 17 May 2017 12:42:41 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 42527 invoked by uid 99); 17 May 2017 12:42:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 May 2017 12:42:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1634ADFFB5; Wed, 17 May 2017 12:42:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kkloudas@apache.org To: commits@flink.apache.org Date: Wed, 17 May 2017 12:42:41 -0000 Message-Id: <4cb98902bd4a480d900b5352f722946c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/9] flink git commit: [FLINK-6371] [cep] NFA return matched patterns as Map>. archived-at: Wed, 17 May 2017 12:42:45 -0000 Repository: flink Updated Branches: refs/heads/release-1.3 fe1316b33 -> 849dd9d85 http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index 2cc67e5..46e2fd4 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -19,7 +19,6 @@ package org.apache.flink.cep.nfa; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import com.google.common.primitives.Doubles; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.Event; @@ -156,22 +155,11 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - List> resultingPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent: inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - resultingPatterns.addAll(patterns); - } - - assertEquals(1, resultingPatterns.size()); - Map patternMap = resultingPatterns.get(0); + List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(startEvent, patternMap.get("start")); - assertEquals(middleEvent, patternMap.get("middle")); - assertEquals(endEvent, patternMap.get("end")); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent, endEvent) + )); } @Test @@ -202,24 +190,11 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set> resultingPatterns = new HashSet<>(); - List> allPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(1, allPatterns.size()); - assertEquals(Sets.>newHashSet( - Sets.newHashSet(middleEvent1, end) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(middleEvent1, end) + )); } @Test @@ -252,19 +227,9 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set> resultingPatterns = new HashSet<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - } - } + List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(Sets.newHashSet(), resultingPatterns); + compareMaps(resultingPatterns, Lists.>newArrayList()); } /** @@ -274,7 +239,6 @@ public class NFAITCase extends TestLogger { @Test public void testSimplePatternWithTimeWindowNFA() { List> events = new ArrayList<>(); - List> resultingPatterns = new ArrayList<>(); final Event startEvent; final Event middleEvent; @@ -313,21 +277,11 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - for (StreamRecord event: events) { - Collection> patterns = nfa.process( - event.getValue(), - event.getTimestamp()).f0; - - resultingPatterns.addAll(patterns); - } - - assertEquals(1, resultingPatterns.size()); - - Map patternMap = resultingPatterns.get(0); + List> resultingPatterns = feedNFA(events, nfa); - assertEquals(startEvent, patternMap.get("start")); - assertEquals(middleEvent, patternMap.get("middle")); - assertEquals(endEvent, patternMap.get("end")); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent, endEvent) + )); } /** @@ -337,9 +291,9 @@ public class NFAITCase extends TestLogger { @Test public void testSimplePatternWithTimeoutHandling() { List> events = new ArrayList<>(); - List> resultingPatterns = new ArrayList<>(); - Set, Long>> resultingTimeoutPatterns = new HashSet<>(); - Set, Long>> expectedTimeoutPatterns = new HashSet<>(); + List>> resultingPatterns = new ArrayList<>(); + Set>, Long>> resultingTimeoutPatterns = new HashSet<>(); + Set>, Long>> expectedTimeoutPatterns = new HashSet<>(); events.add(new StreamRecord<>(new Event(1, "start", 1.0), 1)); events.add(new StreamRecord<>(new Event(2, "start", 1.0), 2)); @@ -348,19 +302,19 @@ public class NFAITCase extends TestLogger { events.add(new StreamRecord<>(new Event(5, "end", 1.0), 11)); events.add(new StreamRecord<>(new Event(6, "end", 1.0), 13)); - Map timeoutPattern1 = new HashMap<>(); - timeoutPattern1.put("start", new Event(1, "start", 1.0)); - timeoutPattern1.put("middle", new Event(3, "middle", 1.0)); + Map> timeoutPattern1 = new HashMap<>(); + timeoutPattern1.put("start", Collections.singletonList(new Event(1, "start", 1.0))); + timeoutPattern1.put("middle", Collections.singletonList(new Event(3, "middle", 1.0))); - Map timeoutPattern2 = new HashMap<>(); - timeoutPattern2.put("start", new Event(2, "start", 1.0)); - timeoutPattern2.put("middle", new Event(3, "middle", 1.0)); + Map> timeoutPattern2 = new HashMap<>(); + timeoutPattern2.put("start", Collections.singletonList(new Event(2, "start", 1.0))); + timeoutPattern2.put("middle", Collections.singletonList(new Event(3, "middle", 1.0))); - Map timeoutPattern3 = new HashMap<>(); - timeoutPattern3.put("start", new Event(1, "start", 1.0)); + Map> timeoutPattern3 = new HashMap<>(); + timeoutPattern3.put("start", Collections.singletonList(new Event(1, "start", 1.0))); - Map timeoutPattern4 = new HashMap<>(); - timeoutPattern4.put("start", new Event(2, "start", 1.0)); + Map> timeoutPattern4 = new HashMap<>(); + timeoutPattern4.put("start", Collections.singletonList(new Event(2, "start", 1.0))); expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern1, 11L)); expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern2, 13L)); @@ -393,10 +347,11 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), true); for (StreamRecord event: events) { - final Tuple2>, Collection, Long>>> patterns = nfa.process(event.getValue(), event.getTimestamp()); + Tuple2>>, Collection>, Long>>> patterns = + nfa.process(event.getValue(), event.getTimestamp()); - Collection> matchedPatterns = patterns.f0; - Collection, Long>> timeoutPatterns = patterns.f1; + Collection>> matchedPatterns = patterns.f0; + Collection>, Long>> timeoutPatterns = patterns.f1; resultingPatterns.addAll(matchedPatterns); resultingTimeoutPatterns.addAll(timeoutPatterns); @@ -460,31 +415,16 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - List> resultingPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent: inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - resultingPatterns.addAll(patterns); - } - - assertEquals(6, resultingPatterns.size()); - - final Set> patterns = new HashSet<>(); - for (Map resultingPattern : resultingPatterns) { - patterns.add(new HashSet<>(resultingPattern.values())); - } + List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, nextOne1, endEvent), - Sets.newHashSet(startEvent, middleEvent2, nextOne1, endEvent), - Sets.newHashSet(startEvent, middleEvent3, nextOne1, endEvent), - Sets.newHashSet(startEvent, middleEvent1, nextOne2, endEvent), - Sets.newHashSet(startEvent, middleEvent2, nextOne2, endEvent), - Sets.newHashSet(startEvent, middleEvent3, nextOne2, endEvent) - ), patterns); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, nextOne1, endEvent), + Lists.newArrayList(startEvent, middleEvent2, nextOne1, endEvent), + Lists.newArrayList(startEvent, middleEvent3, nextOne1, endEvent), + Lists.newArrayList(startEvent, middleEvent1, nextOne2, endEvent), + Lists.newArrayList(startEvent, middleEvent2, nextOne2, endEvent), + Lists.newArrayList(startEvent, middleEvent3, nextOne2, endEvent) + )); } @Test @@ -548,39 +488,26 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set> resultingPatterns = new HashSet<>(); - List> allPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(16, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end2, end4), - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1, end2, end4), - Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1, end2, end4), - Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end1, end2, end4), - Sets.newHashSet(startEvent, middleEvent1, end1, end2, end4), - Sets.newHashSet(startEvent, middleEvent2, end1, end2, end4), - Sets.newHashSet(startEvent, middleEvent3, end1, end2, end4), - Sets.newHashSet(startEvent, end1, end2, end4), - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end3, end4), - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1, end3, end4), - Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1, end3, end4), - Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end1, end3, end4), - Sets.newHashSet(startEvent, middleEvent1, end1, end3, end4), - Sets.newHashSet(startEvent, middleEvent2, end1, end3, end4), - Sets.newHashSet(startEvent, middleEvent3, end1, end3, end4), - Sets.newHashSet(startEvent, end1, end3, end4) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end2, end4), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1, end2, end4), + Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1, end2, end4), + Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end1, end2, end4), + Lists.newArrayList(startEvent, middleEvent1, end1, end2, end4), + Lists.newArrayList(startEvent, middleEvent2, end1, end2, end4), + Lists.newArrayList(startEvent, middleEvent3, end1, end2, end4), + Lists.newArrayList(startEvent, end1, end2, end4), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end3, end4), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1, end3, end4), + Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1, end3, end4), + Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end1, end3, end4), + Lists.newArrayList(startEvent, middleEvent1, end1, end3, end4), + Lists.newArrayList(startEvent, middleEvent2, end1, end3, end4), + Lists.newArrayList(startEvent, middleEvent3, end1, end3, end4), + Lists.newArrayList(startEvent, end1, end3, end4) + )); } @Test @@ -674,27 +601,14 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set> resultingPatterns = new HashSet<>(); - List> allPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(4, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1), - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1), - Sets.newHashSet(startEvent, middleEvent1, end1), - Sets.newHashSet(startEvent, end1) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent1, end1), + Lists.newArrayList(startEvent, end1) + )); } @Test @@ -729,30 +643,17 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set> resultingPatterns = new HashSet<>(); - List> allPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(7, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(middleEvent1, middleEvent2, middleEvent3, end), - Sets.newHashSet(middleEvent1, middleEvent2, end), - Sets.newHashSet(middleEvent2, middleEvent3, end), - Sets.newHashSet(middleEvent1, end), - Sets.newHashSet(middleEvent2, end), - Sets.newHashSet(middleEvent3, end), - Sets.newHashSet(end) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(middleEvent1, middleEvent2, middleEvent3, end), + Lists.newArrayList(middleEvent1, middleEvent2, end), + Lists.newArrayList(middleEvent2, middleEvent3, end), + Lists.newArrayList(middleEvent1, end), + Lists.newArrayList(middleEvent2, end), + Lists.newArrayList(middleEvent3, end), + Lists.newArrayList(end) + )); } @Test @@ -805,29 +706,16 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set> resultingPatterns = new HashSet<>(); - List> allPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(6, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end), - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end), - Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end), - Sets.newHashSet(startEvent, middleEvent2, end), - Sets.newHashSet(startEvent, middleEvent1, end), - Sets.newHashSet(startEvent, end) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end), + Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end), + Lists.newArrayList(startEvent, middleEvent2, end), + Lists.newArrayList(startEvent, middleEvent1, end), + Lists.newArrayList(startEvent, end) + )); } @Test @@ -889,31 +777,18 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set> resultingPatterns = new HashSet<>(); - List> allPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(8, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, merging, end), - Sets.newHashSet(startEvent, middleEvent1, merging, kleene1, end), - Sets.newHashSet(startEvent, middleEvent1, merging, kleene2, end), - Sets.newHashSet(startEvent, middleEvent1, merging, kleene1, kleene2, end), - Sets.newHashSet(startEvent, middleEvent2, merging, end), - Sets.newHashSet(startEvent, middleEvent2, merging, kleene1, end), - Sets.newHashSet(startEvent, middleEvent2, merging, kleene2, end), - Sets.newHashSet(startEvent, middleEvent2, merging, kleene1, kleene2, end) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, merging, end), + Lists.newArrayList(startEvent, middleEvent1, merging, kleene1, end), + Lists.newArrayList(startEvent, middleEvent1, merging, kleene2, end), + Lists.newArrayList(startEvent, middleEvent1, merging, kleene1, kleene2, end), + Lists.newArrayList(startEvent, middleEvent2, merging, end), + Lists.newArrayList(startEvent, middleEvent2, merging, kleene1, end), + Lists.newArrayList(startEvent, middleEvent2, merging, kleene2, end), + Lists.newArrayList(startEvent, middleEvent2, merging, kleene1, kleene2, end) + )); } @Test @@ -958,19 +833,9 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set> resultingPatterns = new HashSet<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - } - } + final List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(Sets.newHashSet(), resultingPatterns); + compareMaps(resultingPatterns, Lists.>newArrayList()); } @Test @@ -1059,26 +924,13 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set> resultingPatterns = new HashSet<>(); - List> allPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(3, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1), - Sets.newHashSet(startEvent, middleEvent1, end1), - Sets.newHashSet(startEvent, middleEvent2, end1) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent1, end1), + Lists.newArrayList(startEvent, middleEvent2, end1) + )); } @Test @@ -1113,30 +965,17 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set> resultingPatterns = new HashSet<>(); - List> allPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(7, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent1, startEvent2, startEvent3, end1), - Sets.newHashSet(startEvent1, startEvent2, end1), - Sets.newHashSet(startEvent1, startEvent3, end1), - Sets.newHashSet(startEvent2, startEvent3, end1), - Sets.newHashSet(startEvent1, end1), - Sets.newHashSet(startEvent2, end1), - Sets.newHashSet(startEvent3, end1) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent1, startEvent2, startEvent3, end1), + Lists.newArrayList(startEvent1, startEvent2, end1), + Lists.newArrayList(startEvent1, startEvent3, end1), + Lists.newArrayList(startEvent2, startEvent3, end1), + Lists.newArrayList(startEvent1, end1), + Lists.newArrayList(startEvent2, end1), + Lists.newArrayList(startEvent3, end1) + )); } @Test @@ -1181,24 +1020,11 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set> resultingPatterns = new HashSet<>(); - List> allPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(1, allPatterns.size()); - assertEquals(Sets.>newHashSet( - Sets.newHashSet(startEvent, endEvent) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, endEvent) + )); } @Test @@ -1291,25 +1117,12 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set> resultingPatterns = new HashSet<>(); - List> allPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(2, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent, end1), - Sets.newHashSet(startEvent, end1) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent, end1), + Lists.newArrayList(startEvent, end1) + )); } @Test @@ -1602,25 +1415,12 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set> resultingPatterns = new HashSet<>(); - List> allPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(2, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, end1), - Sets.newHashSet(end1) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, end1), + Lists.newArrayList(end1) + )); } @Test @@ -1655,27 +1455,14 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set> resultingPatterns = new HashSet<>(); - List> allPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(4, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3), - Sets.newHashSet(startEvent, middleEvent1, middleEvent2), - Sets.newHashSet(startEvent, middleEvent1), - Sets.newHashSet(startEvent) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2), + Lists.newArrayList(startEvent, middleEvent1), + Lists.newArrayList(startEvent) + )); } @Test @@ -1749,25 +1536,12 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set> resultingPatterns = new HashSet<>(); - List> allPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(2, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1), - Sets.newHashSet(startEvent) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent1), + Lists.newArrayList(startEvent) + )); } @Test @@ -1972,7 +1746,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).times(2).consecutive().optional().followedBy("end1").where(new SimpleCondition() { // TODO: 4/4/17 also check order consecutive() vs optional() + }).times(2).consecutive().optional().followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -2063,7 +1837,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).times(2).consecutive().optional().followedBy("end1").where(new SimpleCondition() { // TODO: 4/4/17 also check order consecutive() vs optional() + }).times(2).consecutive().optional().followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -2108,7 +1882,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).times(2).allowCombinations().optional().followedBy("end1").where(new SimpleCondition() { // TODO: 4/4/17 also check order consecutive() vs optional() + }).times(2).allowCombinations().optional().followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -3165,26 +2939,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d, 5)); Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 5167288560432018992L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).notNext("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = 2242479288129905510L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).followedByAny("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 1404509325548220892L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedBy("end").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -8907427230007830915L; @Override public boolean filter(Event value) throws Exception { @@ -3219,26 +2995,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d, 5)); Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -339500190577666439L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).notNext("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = -6913980632538046451L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).followedBy("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 3332196998905139891L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedBy("end").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 2086563479959018387L; @Override public boolean filter(Event value) throws Exception { @@ -3270,27 +3048,29 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(b1, 5)); Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 1672995058886176627L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).followedByAny("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 6003621617520261554L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedByAny("end").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 887700237024758417L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("d"); } }).notNext("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = 5239529076086933032L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); @@ -3321,26 +3101,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d, 5)); Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -2641662468313191976L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).notFollowedBy("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = -3632144132379494778L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).followedByAny("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 3818766882138348167L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedBy("end").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 2033204730795451288L; @Override public boolean filter(Event value) throws Exception { @@ -3374,26 +3156,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d, 5)); Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -2454396370205097543L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).notFollowedBy("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = 2749547391611263290L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).followedByAny("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -4989511337298217255L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).optional().followedBy("end").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -8466223836652936608L; @Override public boolean filter(Event value) throws Exception { @@ -3427,26 +3211,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d, 5)); Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -2568839911852184515L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).followedByAny("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -3632232424064269636L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).times(2).notFollowedBy("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = 3685596793523534611L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedBy("end").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 1960758663575587243L; @Override public boolean filter(Event value) throws Exception { @@ -3482,26 +3268,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d2, 5)); Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 2814850350025111940L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).notFollowedBy("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = 4988756153568853834L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).followedByAny("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -225909103322018778L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).times(2).optional().followedBy("end").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -924294627956373696L; @Override public boolean filter(Event value) throws Exception { @@ -3539,26 +3327,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d2, 5)); Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 6193105689601702341L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).followedByAny("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 5195859580923169111L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).times(2).notFollowedBy("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = 4973027956103783831L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedBy("end").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 2724622546678984894L; @Override public boolean filter(Event value) throws Exception { @@ -3588,19 +3378,21 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(c2, 4)); Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -4289351792573443294L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).notFollowedBy("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = -4989574608417523507L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).followedByAny("end").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -5940131818629290579L; @Override public boolean filter(Event value) throws Exception { @@ -3635,26 +3427,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d, 5)); Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -7885381452276160322L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).notFollowedBy("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = 3471511260235826653L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).followedByAny("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 9073793782452363833L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).times(2).optional().followedBy("end").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 7972902718259767076L; @Override public boolean filter(Event value) throws Exception { @@ -3690,26 +3484,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d, 6)); Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -7866220136345465444L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).notFollowedBy("notPattern").where(new SimpleCondition() { + private static final long serialVersionUID = 4957837489028234932L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).followedBy("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 5569569968862808007L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedBy("end").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -8579678167937416269L; @Override public boolean filter(Event value) throws Exception { @@ -4125,12 +3921,16 @@ public class NFAITCase extends TestLogger { List> resultingPatterns = new ArrayList<>(); for (StreamRecord inputEvent : inputEvents) { - Collection> patterns = nfa.process( + Collection>> patterns = nfa.process( inputEvent.getValue(), inputEvent.getTimestamp()).f0; - for (Map p: patterns) { - resultingPatterns.add(new ArrayList<>(p.values())); + for (Map> p: patterns) { + List res = new ArrayList<>(); + for (List le: p.values()) { + res.addAll(le); + } + resultingPatterns.add(res); } } return resultingPatterns; http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java index d2e392b..11d193a 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java @@ -82,20 +82,20 @@ public class NFATest extends TestLogger { nfa.addState(endState); nfa.addState(endingState); - Set> expectedPatterns = new HashSet<>(); + Set>> expectedPatterns = new HashSet<>(); - Map firstPattern = new HashMap<>(); - firstPattern.put("start", new Event(1, "start", 1.0)); - firstPattern.put("end", new Event(4, "end", 4.0)); + Map> firstPattern = new HashMap<>(); + firstPattern.put("start", Collections.singletonList(new Event(1, "start", 1.0))); + firstPattern.put("end", Collections.singletonList(new Event(4, "end", 4.0))); - Map secondPattern = new HashMap<>(); - secondPattern.put("start", new Event(3, "start", 3.0)); - secondPattern.put("end", new Event(4, "end", 4.0)); + Map> secondPattern = new HashMap<>(); + secondPattern.put("start", Collections.singletonList(new Event(3, "start", 3.0))); + secondPattern.put("end", Collections.singletonList(new Event(4, "end", 4.0))); expectedPatterns.add(firstPattern); expectedPatterns.add(secondPattern); - Collection> actualPatterns = runNFA(nfa, streamEvents); + Collection>> actualPatterns = runNFA(nfa, streamEvents); assertEquals(expectedPatterns, actualPatterns); } @@ -110,15 +110,15 @@ public class NFATest extends TestLogger { streamEvents.add(new StreamRecord<>(new Event(3, "start", 3.0), 3L)); streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 4L)); - Set> expectedPatterns = new HashSet<>(); + Set>> expectedPatterns = new HashSet<>(); - Map secondPattern = new HashMap<>(); - secondPattern.put("start", new Event(3, "start", 3.0)); - secondPattern.put("end", new Event(4, "end", 4.0)); + Map> secondPattern = new HashMap<>(); + secondPattern.put("start", Collections.singletonList(new Event(3, "start", 3.0))); + secondPattern.put("end", Collections.singletonList(new Event(4, "end", 4.0))); expectedPatterns.add(secondPattern); - Collection> actualPatterns = runNFA(nfa, streamEvents); + Collection>> actualPatterns = runNFA(nfa, streamEvents); assertEquals(expectedPatterns, actualPatterns); } @@ -135,9 +135,9 @@ public class NFATest extends TestLogger { streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L)); streamEvents.add(new StreamRecord<>(new Event(2, "end", 2.0), 3L)); - Set> expectedPatterns = Collections.emptySet(); + Set>> expectedPatterns = Collections.emptySet(); - Collection> actualPatterns = runNFA(nfa, streamEvents); + Collection>> actualPatterns = runNFA(nfa, streamEvents); assertEquals(expectedPatterns, actualPatterns); } @@ -156,40 +156,24 @@ public class NFATest extends TestLogger { streamEvents.add(new StreamRecord<>(new Event(3, "foobar", 3.0), 3L)); streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 3L)); - Set> expectedPatterns = new HashSet<>(); + Set>> expectedPatterns = new HashSet<>(); - Map secondPattern = new HashMap<>(); - secondPattern.put("start", new Event(2, "start", 2.0)); - secondPattern.put("end", new Event(4, "end", 4.0)); + Map> secondPattern = new HashMap<>(); + secondPattern.put("start", Collections.singletonList(new Event(2, "start", 2.0))); + secondPattern.put("end", Collections.singletonList(new Event(4, "end", 4.0))); expectedPatterns.add(secondPattern); - Collection> actualPatterns = runNFA(nfa, streamEvents); + Collection>> actualPatterns = runNFA(nfa, streamEvents); assertEquals(expectedPatterns, actualPatterns); } - @Test - public void testStateNameGeneration() { - String expectedName1 = "a[2]"; - String expectedName2 = "a_3"; - String expectedName3 = "a[][42]"; - - String generatedName1 = NFA.generateStateName("a[]", 2); - String generatedName2 = NFA.generateStateName("a", 3); - String generatedName3 = NFA.generateStateName("a[][]", 42); - - - assertEquals(expectedName1, generatedName1); - assertEquals(expectedName2, generatedName2); - assertEquals(expectedName3, generatedName3); - } - - public Collection> runNFA(NFA nfa, List> inputs) { - Set> actualPatterns = new HashSet<>(); + public Collection>> runNFA(NFA nfa, List> inputs) { + Set>> actualPatterns = new HashSet<>(); for (StreamRecord streamEvent : inputs) { - Collection> matchedPatterns = nfa.process( + Collection>> matchedPatterns = nfa.process( streamEvent.getValue(), streamEvent.getTimestamp()).f0; http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java index adc07b3..2da3c31 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java @@ -18,7 +18,8 @@ package org.apache.flink.cep.nfa; -import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; import org.apache.flink.cep.Event; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -48,12 +49,12 @@ public class SharedBufferTest extends TestLogger { events[i] = new Event(i + 1, "e" + (i + 1), i); } - LinkedHashMultimap expectedPattern1 = LinkedHashMultimap.create(); + ListMultimap expectedPattern1 = ArrayListMultimap.create(); expectedPattern1.put("a1", events[2]); expectedPattern1.put("a[]", events[3]); expectedPattern1.put("b", events[5]); - LinkedHashMultimap expectedPattern2 = LinkedHashMultimap.create(); + ListMultimap expectedPattern2 = ArrayListMultimap.create(); expectedPattern2.put("a1", events[0]); expectedPattern2.put("a[]", events[1]); expectedPattern2.put("a[]", events[2]); @@ -61,7 +62,7 @@ public class SharedBufferTest extends TestLogger { expectedPattern2.put("a[]", events[4]); expectedPattern2.put("b", events[5]); - LinkedHashMultimap expectedPattern3 = LinkedHashMultimap.create(); + ListMultimap expectedPattern3 = ArrayListMultimap.create(); expectedPattern3.put("a1", events[0]); expectedPattern3.put("a[]", events[1]); expectedPattern3.put("a[]", events[2]); @@ -84,11 +85,11 @@ public class SharedBufferTest extends TestLogger { sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, DeweyNumber.fromString("1.1")); sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, DeweyNumber.fromString("1.1.0")); - Collection> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0")); + Collection> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0")); sharedBuffer.release("b", events[7], timestamp); - Collection> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0")); - Collection> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("2.0.0")); - Collection> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("1.0.0")); + Collection> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0")); + Collection> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("2.0.0")); + Collection> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("1.0.0")); sharedBuffer.release("b", events[5], timestamp); assertEquals(1L, patterns3.size()); http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java index 90a6321..26b8ce9 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java @@ -18,7 +18,6 @@ package org.apache.flink.cep.nfa.compiler; -import com.google.common.collect.Sets; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java index 2f7cdeb..afb3e7c 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java @@ -36,6 +36,7 @@ import org.apache.flink.streaming.util.OperatorSnapshotUtil; import org.junit.Ignore; import org.junit.Test; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; @@ -72,7 +73,7 @@ public class CEPFrom12MigrationTest { final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); - OneInputStreamOperatorTestHarness> harness = + OneInputStreamOperatorTestHarness>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), @@ -120,7 +121,7 @@ public class CEPFrom12MigrationTest { final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); final Event endEvent = new Event(42, "end", 1.0); - OneInputStreamOperatorTestHarness> harness = + OneInputStreamOperatorTestHarness>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), @@ -160,18 +161,18 @@ public class CEPFrom12MigrationTest { assertTrue(resultRecord2.getValue() instanceof Map); @SuppressWarnings("unchecked") - Map patternMap1 = (Map) resultRecord1.getValue(); + Map> patternMap1 = (Map>) resultRecord1.getValue(); - assertEquals(startEvent, patternMap1.get("start")); - assertEquals(middleEvent1, patternMap1.get("middle")); - assertEquals(endEvent, patternMap1.get("end")); + assertEquals(startEvent, patternMap1.get("start").get(0)); + assertEquals(middleEvent1, patternMap1.get("middle").get(0)); + assertEquals(endEvent, patternMap1.get("end").get(0)); @SuppressWarnings("unchecked") - Map patternMap2 = (Map) resultRecord2.getValue(); + Map> patternMap2 = (Map>) resultRecord2.getValue(); - assertEquals(startEvent, patternMap2.get("start")); - assertEquals(middleEvent2, patternMap2.get("middle")); - assertEquals(endEvent, patternMap2.get("end")); + assertEquals(startEvent, patternMap2.get("start").get(0)); + assertEquals(middleEvent2, patternMap2.get("middle").get(0)); + assertEquals(endEvent, patternMap2.get("end").get(0)); harness.close(); } @@ -195,7 +196,7 @@ public class CEPFrom12MigrationTest { final Event startEvent1 = new Event(42, "start", 1.0); final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); - OneInputStreamOperatorTestHarness> harness = + OneInputStreamOperatorTestHarness>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), @@ -241,7 +242,7 @@ public class CEPFrom12MigrationTest { final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); final Event endEvent = new Event(42, "end", 1.0); - OneInputStreamOperatorTestHarness> harness = + OneInputStreamOperatorTestHarness>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), @@ -287,25 +288,25 @@ public class CEPFrom12MigrationTest { assertTrue(resultRecord3.getValue() instanceof Map); @SuppressWarnings("unchecked") - Map patternMap1 = (Map) resultRecord1.getValue(); + Map> patternMap1 = (Map>) resultRecord1.getValue(); - assertEquals(startEvent1, patternMap1.get("start")); - assertEquals(middleEvent1, patternMap1.get("middle")); - assertEquals(endEvent, patternMap1.get("end")); + assertEquals(startEvent1, patternMap1.get("start").get(0)); + assertEquals(middleEvent1, patternMap1.get("middle").get(0)); + assertEquals(endEvent, patternMap1.get("end").get(0)); @SuppressWarnings("unchecked") - Map patternMap2 = (Map) resultRecord2.getValue(); + Map> patternMap2 = (Map>) resultRecord2.getValue(); - assertEquals(startEvent1, patternMap2.get("start")); - assertEquals(middleEvent2, patternMap2.get("middle")); - assertEquals(endEvent, patternMap2.get("end")); + assertEquals(startEvent1, patternMap2.get("start").get(0)); + assertEquals(middleEvent2, patternMap2.get("middle").get(0)); + assertEquals(endEvent, patternMap2.get("end").get(0)); @SuppressWarnings("unchecked") - Map patternMap3 = (Map) resultRecord3.getValue(); + Map> patternMap3 = (Map>) resultRecord3.getValue(); - assertEquals(startEvent2, patternMap3.get("start")); - assertEquals(middleEvent2, patternMap3.get("middle")); - assertEquals(endEvent, patternMap3.get("end")); + assertEquals(startEvent2, patternMap3.get("start").get(0)); + assertEquals(middleEvent2, patternMap3.get("middle").get(0)); + assertEquals(endEvent, patternMap3.get("end").get(0)); harness.close(); } @@ -328,7 +329,7 @@ public class CEPFrom12MigrationTest { final Event startEvent1 = new Event(42, "start", 1.0); - OneInputStreamOperatorTestHarness> harness = + OneInputStreamOperatorTestHarness>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), @@ -367,7 +368,7 @@ public class CEPFrom12MigrationTest { final Event startEvent1 = new Event(42, "start", 1.0); - OneInputStreamOperatorTestHarness> harness = + OneInputStreamOperatorTestHarness>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), @@ -401,9 +402,9 @@ public class CEPFrom12MigrationTest { assertTrue(resultRecord.getValue() instanceof Map); @SuppressWarnings("unchecked") - Map patternMap = (Map) resultRecord.getValue(); + Map> patternMap = (Map>) resultRecord.getValue(); - assertEquals(startEvent1, patternMap.get("start")); + assertEquals(startEvent1, patternMap.get("start").get(0)); harness.close(); } http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java index 4e05fcf..404de54 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.Test; import java.net.URL; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; @@ -95,7 +96,7 @@ public class CEPMigration11to13Test { harness.close(); */ - OneInputStreamOperatorTestHarness> harness = + OneInputStreamOperatorTestHarness>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), @@ -129,11 +130,11 @@ public class CEPMigration11to13Test { assertTrue(resultRecord.getValue() instanceof Map); @SuppressWarnings("unchecked") - Map patternMap = (Map) resultRecord.getValue(); + Map> patternMap = (Map>) resultRecord.getValue(); - assertEquals(startEvent, patternMap.get("start")); - assertEquals(middleEvent, patternMap.get("middle")); - assertEquals(endEvent, patternMap.get("end")); + assertEquals(startEvent, patternMap.get("start").get(0)); + assertEquals(middleEvent, patternMap.get("middle").get(0)); + assertEquals(endEvent, patternMap.get("end").get(0)); harness.close(); } @@ -170,7 +171,7 @@ public class CEPMigration11to13Test { NullByteKeySelector keySelector = new NullByteKeySelector(); - OneInputStreamOperatorTestHarness> harness = + OneInputStreamOperatorTestHarness>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), @@ -204,11 +205,11 @@ public class CEPMigration11to13Test { assertTrue(resultRecord.getValue() instanceof Map); @SuppressWarnings("unchecked") - Map patternMap = (Map) resultRecord.getValue(); + Map> patternMap = (Map>) resultRecord.getValue(); - assertEquals(startEvent, patternMap.get("start")); - assertEquals(middleEvent, patternMap.get("middle")); - assertEquals(endEvent, patternMap.get("end")); + assertEquals(startEvent, patternMap.get("start").get(0)); + assertEquals(middleEvent, patternMap.get("middle").get(0)); + assertEquals(endEvent, patternMap.get("end").get(0)); harness.close(); } http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 4048bc2..5ed8b46 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -46,7 +46,9 @@ import org.junit.rules.TemporaryFolder; import static org.junit.Assert.*; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Queue; @@ -58,7 +60,7 @@ public class CEPOperatorTest extends TestLogger { @Test public void testKeyedCEPOperatorWatermarkForwarding() throws Exception { - OneInputStreamOperatorTestHarness> harness = getCepTestHarness(false); + OneInputStreamOperatorTestHarness>> harness = getCepTestHarness(false); harness.open(); @@ -74,7 +76,7 @@ public class CEPOperatorTest extends TestLogger { @Test public void testKeyedCEPOperatorCheckpointing() throws Exception { - OneInputStreamOperatorTestHarness> harness = getCepTestHarness(false); + OneInputStreamOperatorTestHarness>> harness = getCepTestHarness(false); harness.open(); @@ -138,7 +140,7 @@ public class CEPOperatorTest extends TestLogger { RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); rocksDBStateBackend.setDbStoragePath(rocksDbPath); - OneInputStreamOperatorTestHarness> harness = getCepTestHarness(false); + OneInputStreamOperatorTestHarness>> harness = getCepTestHarness(false); harness.setStateBackend(rocksDBStateBackend); @@ -208,7 +210,6 @@ public class CEPOperatorTest extends TestLogger { * Tests that the internal time of a CEP operator advances only given watermarks. See FLINK-5033 */ @Test - @SuppressWarnings("unchecked") public void testKeyedAdvancingTimeWithoutElements() throws Exception { final KeySelector keySelector = new TestKeySelector(); @@ -216,10 +217,10 @@ public class CEPOperatorTest extends TestLogger { final long watermarkTimestamp1 = 5L; final long watermarkTimestamp2 = 13L; - final Map expectedSequence = new HashMap<>(2); - expectedSequence.put("start", startEvent); + final Map> expectedSequence = new HashMap<>(2); + expectedSequence.put("start", Collections.singletonList(startEvent)); - OneInputStreamOperatorTestHarness, Long>, Map>> harness = new KeyedOneInputStreamOperatorTestHarness<>( + OneInputStreamOperatorTestHarness>, Long>, Map>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new TimeoutKeyedCEPPatternOperator<>( Event.createTypeSerializer(), false, @@ -234,7 +235,7 @@ public class CEPOperatorTest extends TestLogger { try { harness.setup( new KryoSerializer<>( - (Class, Long>, Map>>) (Object) Either.class, + (Class>, Long>, Map>>>) (Object) Either.class, new ExecutionConfig())); harness.open(); @@ -256,13 +257,15 @@ public class CEPOperatorTest extends TestLogger { assertTrue(resultObject instanceof StreamRecord); - StreamRecord, Long>, Map>> streamRecord = (StreamRecord,Long>,Map>>) resultObject; + StreamRecord>, Long>, Map>>> streamRecord = + (StreamRecord>,Long>,Map>>>) resultObject; assertTrue(streamRecord.getValue() instanceof Either.Left); - Either.Left, Long>, Map> left = (Either.Left, Long>, Map>) streamRecord.getValue(); + Either.Left>, Long>, Map>> left = + (Either.Left>, Long>, Map>>) streamRecord.getValue(); - Tuple2, Long> leftResult = left.left(); + Tuple2>, Long> leftResult = left.left(); assertEquals(watermarkTimestamp2, (long) leftResult.f1); assertEquals(expectedSequence, leftResult.f0); @@ -292,7 +295,7 @@ public class CEPOperatorTest extends TestLogger { TestKeySelector keySelector = new TestKeySelector(); KeyedCEPPatternOperator operator = getKeyedCepOpearator(false, keySelector); - OneInputStreamOperatorTestHarness> harness = getCepTestHarness(operator); + OneInputStreamOperatorTestHarness>> harness = getCepTestHarness(operator); harness.open(); @@ -380,7 +383,7 @@ public class CEPOperatorTest extends TestLogger { TestKeySelector keySelector = new TestKeySelector(); KeyedCEPPatternOperator operator = getKeyedCepOpearator(true, keySelector); - OneInputStreamOperatorTestHarness> harness = getCepTestHarness(operator); + OneInputStreamOperatorTestHarness>> harness = getCepTestHarness(operator); harness.open(); @@ -449,13 +452,13 @@ public class CEPOperatorTest extends TestLogger { assertTrue(resultRecord.getValue() instanceof Map); @SuppressWarnings("unchecked") - Map patternMap = (Map) resultRecord.getValue(); - assertEquals(start, patternMap.get("start")); - assertEquals(middle, patternMap.get("middle")); - assertEquals(end, patternMap.get("end")); + Map> patternMap = (Map>) resultRecord.getValue(); + assertEquals(start, patternMap.get("start").get(0)); + assertEquals(middle, patternMap.get("middle").get(0)); + assertEquals(end, patternMap.get("end").get(0)); } - private OneInputStreamOperatorTestHarness> getCepTestHarness(boolean isProcessingTime) throws Exception { + private OneInputStreamOperatorTestHarness>> getCepTestHarness(boolean isProcessingTime) throws Exception { KeySelector keySelector = new TestKeySelector(); return new KeyedOneInputStreamOperatorTestHarness<>( @@ -464,7 +467,7 @@ public class CEPOperatorTest extends TestLogger { BasicTypeInfo.INT_TYPE_INFO); } - private OneInputStreamOperatorTestHarness> getCepTestHarness( + private OneInputStreamOperatorTestHarness>> getCepTestHarness( KeyedCEPPatternOperator cepOperator) throws Exception { KeySelector keySelector = new TestKeySelector(); http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java index a048183..0210ef9 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.Test; +import java.util.List; import java.util.Map; import java.util.Queue; @@ -79,7 +80,7 @@ public class CEPRescalingTest { // now we start the test, we go from parallelism 1 to 2. - OneInputStreamOperatorTestHarness> harness = + OneInputStreamOperatorTestHarness>> harness = getTestHarness(maxParallelism, 1, 0); harness.open(); @@ -99,7 +100,7 @@ public class CEPRescalingTest { // so we initialize the two tasks and we put the rest of // the valid elements for the pattern on task 0. - OneInputStreamOperatorTestHarness> harness1 = + OneInputStreamOperatorTestHarness>> harness1 = getTestHarness(maxParallelism, 2, 0); harness1.setup(); @@ -120,7 +121,7 @@ public class CEPRescalingTest { verifyWatermark(harness1.getOutput().poll(), 2); verifyPattern(harness1.getOutput().poll(), startEvent1, middleEvent1, endEvent1); - OneInputStreamOperatorTestHarness> harness2 = + OneInputStreamOperatorTestHarness>> harness2 = getTestHarness(maxParallelism, 2, 1); harness2.setup(); @@ -198,15 +199,15 @@ public class CEPRescalingTest { // starting the test, we will go from parallelism of 3 to parallelism of 2 - OneInputStreamOperatorTestHarness> harness1 = + OneInputStreamOperatorTestHarness>> harness1 = getTestHarness(maxParallelism, 3, 0); harness1.open(); - OneInputStreamOperatorTestHarness> harness2 = + OneInputStreamOperatorTestHarness>> harness2 = getTestHarness(maxParallelism, 3, 1); harness2.open(); - OneInputStreamOperatorTestHarness> harness3 = + OneInputStreamOperatorTestHarness>> harness3 = getTestHarness(maxParallelism, 3, 2); harness3.open(); @@ -251,13 +252,13 @@ public class CEPRescalingTest { harness3.snapshot(0, 0) ); - OneInputStreamOperatorTestHarness> harness4 = + OneInputStreamOperatorTestHarness>> harness4 = getTestHarness(maxParallelism, 2, 0); harness4.setup(); harness4.initializeState(snapshot); harness4.open(); - OneInputStreamOperatorTestHarness> harness5 = + OneInputStreamOperatorTestHarness>> harness5 = getTestHarness(maxParallelism, 2, 1); harness5.setup(); harness5.initializeState(snapshot); @@ -295,8 +296,8 @@ public class CEPRescalingTest { assertTrue(resultRecord.getValue() instanceof Map); @SuppressWarnings("unchecked") - Map patternMap = (Map) resultRecord.getValue(); - if (patternMap.get("start").getId() == 7) { + Map> patternMap = (Map>) resultRecord.getValue(); + if (patternMap.get("start").get(0).getId() == 7) { verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1); verifyPattern(harness4.getOutput().poll(), startEvent3, middleEvent3, endEvent3); } else { @@ -327,13 +328,13 @@ public class CEPRescalingTest { assertTrue(resultRecord.getValue() instanceof Map); @SuppressWarnings("unchecked") - Map patternMap = (Map) resultRecord.getValue(); - assertEquals(start, patternMap.get("start")); - assertEquals(middle, patternMap.get("middle")); - assertEquals(end, patternMap.get("end")); + Map> patternMap = (Map>) resultRecord.getValue(); + assertEquals(start, patternMap.get("start").get(0)); + assertEquals(middle, patternMap.get("middle").get(0)); + assertEquals(end, patternMap.get("end").get(0)); } - private KeyedOneInputStreamOperatorTestHarness> getTestHarness( + private KeyedOneInputStreamOperatorTestHarness>> getTestHarness( int maxParallelism, int taskParallelism, int subtaskIdx) throws Exception {