flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [hotfix] [cep] Make cep window border treatment consistent
Date Tue, 19 Apr 2016 16:40:23 GMT
Repository: flink
Updated Branches:
  refs/heads/master 716d83209 -> 7462a5bfd


[hotfix] [cep] Make cep window border treatment consistent

The NFA and the SharedBuffer treated window borders differently. Therefore, it was
possible that events were pruned even though the NFA still contained references to
these events. Now CEP windows are left inclusive and right exclusive.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7462a5bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7462a5bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7462a5bf

Branch: refs/heads/master
Commit: 7462a5bfd7cb2dafbbc9eb02a43d3db9f6add30e
Parents: 716d832
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Apr 19 18:37:32 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Apr 19 18:37:32 2016 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java |   2 +-
 .../org/apache/flink/cep/nfa/NFAITCase.java     |   2 +-
 .../java/org/apache/flink/cep/nfa/NFATest.java  | 123 +++++++++++++------
 3 files changed, 90 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7462a5bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 47fc7df..5824264 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -126,7 +126,7 @@ public class NFA<T> implements Serializable {
 
 			if (!computationState.isStartState() &&
 				windowTime > 0 &&
-				timestamp - computationState.getStartTimestamp() > windowTime) {
+				timestamp - computationState.getStartTimestamp() >= windowTime) {
 				// remove computation state which has exceeded the window length
 				sharedBuffer.release(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
 				sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());

http://git-wip-us.apache.org/repos/asf/flink/blob/7462a5bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index a46a81e..0a3dc7d 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -113,7 +113,7 @@ public class NFAITCase extends TestLogger {
 		events.add(new StreamEvent<Event>(startEvent = new Event(2, "start", 1.0), 2));
 		events.add(new StreamEvent<Event>(middleEvent = new Event(3, "middle", 1.0), 3));
 		events.add(new StreamEvent<Event>(new Event(4, "foobar", 1.0), 4));
-		events.add(new StreamEvent<Event>(endEvent = new Event(5, "end", 1.0), 12));
+		events.add(new StreamEvent<Event>(endEvent = new Event(5, "end", 1.0), 11));
 		events.add(new StreamEvent<Event>(new Event(6, "end", 1.0), 13));
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7462a5bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 3face76..a915dee 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -31,6 +31,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -109,7 +110,7 @@ public class NFATest extends TestLogger {
 
 	@Test
 	public void testTimeoutWindowPruning() {
-		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 2);
+		NFA<Event> nfa = createStartEndNFA(2);
 		List<StreamEvent<Event>> streamEvents = new ArrayList<>();
 
 		streamEvents.add(StreamEvent.of(new Event(1, "start", 1.0), 1L));
@@ -117,50 +118,56 @@ public class NFATest extends TestLogger {
 		streamEvents.add(StreamEvent.of(new Event(3, "start", 3.0), 3L));
 		streamEvents.add(StreamEvent.of(new Event(4, "end", 4.0), 4L));
 
-		State<Event> startingState = new State<>("", State.StateType.Start);
-		State<Event> startState = new State<>("start", State.StateType.Normal);
-		State<Event> endState = new State<>("end", State.StateType.Final);
-		StateTransition<Event> starting2Start = new StateTransition<>(
-			StateTransitionAction.TAKE,
-			startState,
-			new FilterFunction<Event>() {
-				private static final long serialVersionUID = -4869589195918650396L;
+		Set<Map<String, Event>> expectedPatterns = new HashSet<>();
 
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("start");
-				}
-		});
+		Map<String, Event> secondPattern = new HashMap<>();
+		secondPattern.put("start", new Event(3, "start", 3.0));
+		secondPattern.put("end", new Event(4, "end", 4.0));
 
-		StateTransition<Event> start2End = new StateTransition<>(
-			StateTransitionAction.TAKE,
-			endState,
-			new FilterFunction<Event>() {
-				private static final long serialVersionUID = 2979804163709590673L;
+		expectedPatterns.add(secondPattern);
 
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("end");
-				}
-		});
+		Collection<Map<String, Event>> actualPatterns = runNFA(nfa, streamEvents);
 
-		StateTransition<Event> start2Start = new StateTransition<>(
-			StateTransitionAction.IGNORE,
-			startState,
-			null);
+		assertEquals(expectedPatterns, actualPatterns);
+	}
 
-		startingState.addStateTransition(starting2Start);
-		startState.addStateTransition(start2End);
-		startState.addStateTransition(start2Start);
+	/**
+	 * Tests that elements whose timestamp difference is exactly the window length are not matched.
+	 * The reaon is that the right window side (later elements) is exclusive.
+	 */
+	@Test
+	public void testWindowBorders() {
+		NFA<Event> nfa = createStartEndNFA(2);
+		List<StreamEvent<Event>> streamEvents = new ArrayList<>();
 
-		nfa.addState(startingState);
-		nfa.addState(startState);
-		nfa.addState(endState);
+		streamEvents.add(StreamEvent.of(new Event(1, "start", 1.0), 1L));
+		streamEvents.add(StreamEvent.of(new Event(2, "end", 2.0), 3L));
+
+		Set<Map<String, Event>> expectedPatterns = Collections.emptySet();
+
+		Collection<Map<String, Event>> actualPatterns = runNFA(nfa, streamEvents);
+
+		assertEquals(expectedPatterns, actualPatterns);
+	}
+
+	/**
+	 * Tests that pruning shared buffer elements and computations state use the same window
border
+	 * semantics (left side inclusive and right side exclusive)
+	 */
+	@Test
+	public void testTimeoutWindowPruningWindowBorders() {
+		NFA<Event> nfa = createStartEndNFA(2);
+		List<StreamEvent<Event>> streamEvents = new ArrayList<>();
+
+		streamEvents.add(StreamEvent.of(new Event(1, "start", 1.0), 1L));
+		streamEvents.add(StreamEvent.of(new Event(2, "start", 2.0), 2L));
+		streamEvents.add(StreamEvent.of(new Event(3, "foobar", 3.0), 3L));
+		streamEvents.add(StreamEvent.of(new Event(4, "end", 4.0), 3L));
 
 		Set<Map<String, Event>> expectedPatterns = new HashSet<>();
 
 		Map<String, Event> secondPattern = new HashMap<>();
-		secondPattern.put("start", new Event(3, "start", 3.0));
+		secondPattern.put("start", new Event(2, "start", 2.0));
 		secondPattern.put("end", new Event(4, "end", 4.0));
 
 		expectedPatterns.add(secondPattern);
@@ -243,6 +250,52 @@ public class NFATest extends TestLogger {
 		assertEquals(nfa, copy);
 	}
 
+	private NFA<Event> createStartEndNFA(long windowLength) {
+		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), windowLength);
+
+		State<Event> startingState = new State<>("", State.StateType.Start);
+		State<Event> startState = new State<>("start", State.StateType.Normal);
+		State<Event> endState = new State<>("end", State.StateType.Final);
+		StateTransition<Event> starting2Start = new StateTransition<>(
+			StateTransitionAction.TAKE,
+			startState,
+			new FilterFunction<Event>() {
+				private static final long serialVersionUID = -4869589195918650396L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("start");
+				}
+			});
+
+		StateTransition<Event> start2End = new StateTransition<>(
+			StateTransitionAction.TAKE,
+			endState,
+			new FilterFunction<Event>() {
+				private static final long serialVersionUID = 2979804163709590673L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("end");
+				}
+			});
+
+		StateTransition<Event> start2Start = new StateTransition<>(
+			StateTransitionAction.IGNORE,
+			startState,
+			null);
+
+		startingState.addStateTransition(starting2Start);
+		startState.addStateTransition(start2End);
+		startState.addStateTransition(start2Start);
+
+		nfa.addState(startingState);
+		nfa.addState(startState);
+		nfa.addState(endState);
+
+		return nfa;
+	}
+
 	private static class NameFilter implements FilterFunction<Event> {
 
 		private static final long serialVersionUID = 7472112494752423802L;


Mime
View raw message