flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dwysakow...@apache.org
Subject [3/3] flink git commit: [FLINK-7061] [cep] Fix quantifier range starting from 0
Date Wed, 05 Jul 2017 09:56:30 GMT
[FLINK-7061] [cep] Fix quantifier range starting from 0

This closes #4242


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fc96cd1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fc96cd1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fc96cd1

Branch: refs/heads/master
Commit: 3fc96cd1f9564a60ba5ec7f06a1fec4ab173b200
Parents: 3096bd0
Author: Dian Fu <fudian.fd@alibaba-inc.com>
Authored: Sun Jul 2 13:11:05 2017 +0800
Committer: Dawid Wysakowicz <dwysakowicz@apache.org>
Committed: Wed Jul 5 11:53:59 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/cep/pattern/Pattern.java   |  1 +
 .../apache/flink/cep/pattern/Quantifier.java    |  3 +-
 .../apache/flink/cep/nfa/TimesRangeITCase.java  | 51 ++++++++++++++++++++
 .../apache/flink/cep/pattern/PatternTest.java   | 10 ++++
 4 files changed, 63 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3fc96cd1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index f4d3404..2ffbc41 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -368,6 +368,7 @@ public class Pattern<T, F extends T> {
 		this.quantifier = Quantifier.times(quantifier.getConsumingStrategy());
 		if (from == 0) {
 			this.quantifier.optional();
+			from = 1;
 		}
 		this.times = Times.of(from, to);
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/3fc96cd1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index c1893b4..9192a13 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -153,9 +153,8 @@ public class Quantifier {
 		private final int to;
 
 		private Times(int from, int to) {
-			Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater
than or equal to 0.");
+			Preconditions.checkArgument(from > 0, "The from should be a positive number greater
than 0.");
 			Preconditions.checkArgument(to >= from, "The to should be a number greater than or
equal to from: " + from + ".");
-			Preconditions.checkArgument(from != to || from != 0, "The from and to should not be both
equal to 0.");
 			this.from = from;
 			this.to = to;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3fc96cd1/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
index 4305fa2..37a9534 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
@@ -92,6 +92,57 @@ public class TimesRangeITCase extends TestLogger {
 	}
 
 	@Test
+	public void testTimesRangeFromZero() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).next("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(0, 2).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>()
{
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, end1),
+			Lists.newArrayList(startEvent, end1)
+		));
+	}
+
+	@Test
 	public void testTimesRangeNonStrict() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fc96cd1/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index 999e5f3..6d93ff3 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -195,6 +195,16 @@ public class PatternTest extends TestLogger {
 		assertEquals(previous2.getName(), "start");
 	}
 
+	@Test(expected = IllegalArgumentException.class)
+	public void testPatternTimesNegativeTimes() throws Exception {
+		Pattern.begin("start").where(dummyCondition()).times(-1);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testPatternTimesNegativeFrom() throws Exception {
+		Pattern.begin("start").where(dummyCondition()).times(-1, 2);
+	}
+
 	@Test(expected = MalformedPatternException.class)
 	public void testPatternCanHaveQuantifierSpecifiedOnce1() throws Exception {
 


Mime
View raw message