Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DC0DF200C70 for ; Thu, 4 May 2017 17:51:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DA9DB160BB0; Thu, 4 May 2017 15:51:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 05F55160B9F for ; Thu, 4 May 2017 17:51:46 +0200 (CEST) Received: (qmail 85728 invoked by uid 500); 4 May 2017 15:51:46 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 85719 invoked by uid 99); 4 May 2017 15:51:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 May 2017 15:51:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1E588DFBC8; Thu, 4 May 2017 15:51:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kkloudas@apache.org To: commits@flink.apache.org Message-Id: <0c269db028304c68affbc387b40a4ef2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-6445] [cep] Fix NPE in no-condition patterns. Date: Thu, 4 May 2017 15:51:46 +0000 (UTC) archived-at: Thu, 04 May 2017 15:51:48 -0000 Repository: flink Updated Branches: refs/heads/master a2ec3ee66 -> d6435e87c [FLINK-6445] [cep] Fix NPE in no-condition patterns. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d6435e87 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d6435e87 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d6435e87 Branch: refs/heads/master Commit: d6435e87cd4c58dfa26c2acf10474d7eb7c46f57 Parents: a2ec3ee Author: kl0u Authored: Thu May 4 15:33:40 2017 +0200 Committer: kl0u Committed: Thu May 4 15:58:47 2017 +0200 ---------------------------------------------------------------------- .../org/apache/flink/cep/pattern/Pattern.java | 7 ++- .../cep/pattern/conditions/AndCondition.java | 6 +- .../cep/pattern/conditions/NotCondition.java | 2 +- .../cep/pattern/conditions/OrCondition.java | 6 +- .../pattern/conditions/SubtypeCondition.java | 4 +- .../org/apache/flink/cep/nfa/NFAITCase.java | 66 ++++++++++++++++++++ 6 files changed, 84 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index cef0f85..b100bc5 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -127,8 +127,9 @@ public class Pattern { * @return The pattern with the new condition is set. */ public Pattern where(IterativeCondition condition) { - ClosureCleaner.clean(condition, true); + Preconditions.checkNotNull(condition, "The condition cannot be null."); + ClosureCleaner.clean(condition, true); if (this.condition == null) { this.condition = condition; } else { @@ -148,6 +149,8 @@ public class Pattern { * @return The pattern with the new condition is set. */ public Pattern or(IterativeCondition condition) { + Preconditions.checkNotNull(condition, "The condition cannot be null."); + ClosureCleaner.clean(condition, true); if (this.condition == null) { @@ -167,6 +170,8 @@ public class Pattern { * @return The same pattern with the new subtype constraint */ public Pattern subtype(final Class subtypeClass) { + Preconditions.checkNotNull(subtypeClass, "The class cannot be null."); + if (condition == null) { this.condition = new SubtypeCondition(subtypeClass); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java index 5df7c66..ac34c41 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java @@ -18,6 +18,8 @@ package org.apache.flink.cep.pattern.conditions; +import org.apache.flink.util.Preconditions; + /** * A {@link IterativeCondition condition} which combines two conditions with a logical * {@code AND} and returns {@code true} if both are {@code true}. @@ -32,8 +34,8 @@ public class AndCondition extends IterativeCondition { private final IterativeCondition right; public AndCondition(final IterativeCondition left, final IterativeCondition right) { - this.left = left; - this.right = right; + this.left = Preconditions.checkNotNull(left, "The condition cannot be null."); + this.right = Preconditions.checkNotNull(right, "The condition cannot be null."); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java index 3e6ab56..9318c2f 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java @@ -35,6 +35,6 @@ public class NotCondition extends IterativeCondition { @Override public boolean filter(T value, Context ctx) throws Exception { - return !original.filter(value, ctx); + return original != null && !original.filter(value, ctx); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java index 6aaa4bb..d3690ab 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java @@ -18,6 +18,8 @@ package org.apache.flink.cep.pattern.conditions; +import org.apache.flink.util.Preconditions; + /** * A {@link IterativeCondition condition} which combines two conditions with a logical * {@code OR} and returns {@code true} if at least one is {@code true}. @@ -32,8 +34,8 @@ public class OrCondition extends IterativeCondition { private final IterativeCondition right; public OrCondition(final IterativeCondition left, final IterativeCondition right) { - this.left = left; - this.right = right; + this.left = Preconditions.checkNotNull(left, "The condition cannot be null."); + this.right = Preconditions.checkNotNull(right, "The condition cannot be null."); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java index 91f6c21..cff8693 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java @@ -18,6 +18,8 @@ package org.apache.flink.cep.pattern.conditions; +import org.apache.flink.util.Preconditions; + /** * A {@link IterativeCondition condition} which filters elements of the given type. * An element is filtered out iff it is not assignable to the given subtype of {@code T}. @@ -31,7 +33,7 @@ public class SubtypeCondition extends SimpleCondition { private final Class subtype; public SubtypeCondition(final Class subtype) { - this.subtype = subtype; + this.subtype = Preconditions.checkNotNull(subtype, "The subtype cannot be null."); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index 4a00c1e..fe31564 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -51,6 +51,72 @@ import static org.junit.Assert.assertEquals; public class NFAITCase extends TestLogger { @Test + public void testNoConditionNFA() { + List> inputEvents = new ArrayList<>(); + + Event a = new Event(40, "a", 1.0); + Event b = new Event(41, "b", 2.0); + Event c = new Event(42, "c", 3.0); + Event d = new Event(43, "d", 4.0); + Event e = new Event(44, "e", 5.0); + + inputEvents.add(new StreamRecord<>(a, 1)); + inputEvents.add(new StreamRecord<>(b, 2)); + inputEvents.add(new StreamRecord<>(c, 3)); + inputEvents.add(new StreamRecord<>(d, 4)); + inputEvents.add(new StreamRecord<>(e, 5)); + + Pattern pattern = Pattern.begin("start").followedBy("end"); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(a, b), + Lists.newArrayList(b, c), + Lists.newArrayList(c, d), + Lists.newArrayList(d, e) + )); + } + + @Test + public void testAnyWithNoConditionNFA() { + List> inputEvents = new ArrayList<>(); + + Event a = new Event(40, "a", 1.0); + Event b = new Event(41, "b", 2.0); + Event c = new Event(42, "c", 3.0); + Event d = new Event(43, "d", 4.0); + Event e = new Event(44, "e", 5.0); + + inputEvents.add(new StreamRecord<>(a, 1)); + inputEvents.add(new StreamRecord<>(b, 2)); + inputEvents.add(new StreamRecord<>(c, 3)); + inputEvents.add(new StreamRecord<>(d, 4)); + inputEvents.add(new StreamRecord<>(e, 5)); + + Pattern pattern = Pattern.begin("start").followedByAny("end"); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(a, b), + Lists.newArrayList(a, c), + Lists.newArrayList(a, d), + Lists.newArrayList(a, e), + Lists.newArrayList(b, c), + Lists.newArrayList(b, d), + Lists.newArrayList(b, e), + Lists.newArrayList(c, d), + Lists.newArrayList(c, e), + Lists.newArrayList(d, e) + )); + } + + @Test public void testSimplePatternNFA() { List> inputEvents = new ArrayList<>();