flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [08/13] flink git commit: [FLINK-6290] [CEP] Fix SharedBuffer release when having multiple edges between entries
Date Fri, 21 Apr 2017 12:24:22 GMT
[FLINK-6290] [CEP] Fix SharedBuffer release when having multiple edges between entries

This closes #3706


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0ea74a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0ea74a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0ea74a0

Branch: refs/heads/master
Commit: c0ea74a0a4eedf4fe73e8f383e837ea373216476
Parents: 0506545
Author: Dawid Wysakowicz <dawid@getindata.com>
Authored: Mon Apr 10 16:57:48 2017 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Apr 21 12:22:00 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |  5 ++--
 .../apache/flink/cep/nfa/SharedBufferTest.java  | 28 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c0ea74a0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index ccc6884..43c2aca 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -274,7 +274,6 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 	public void release(final K key, final V value, final long timestamp) {
 		SharedBufferEntry<K, V> entry = get(key, value, timestamp);
 		if (entry != null) {
-			entry.decreaseReferenceCounter();
 			internalRemove(entry);
 		}
 	}
@@ -493,13 +492,13 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 
 		while (!entriesToRemove.isEmpty()) {
 			SharedBufferEntry<K, V> currentEntry = entriesToRemove.pop();
+			currentEntry.decreaseReferenceCounter();
 
 			if (currentEntry.getReferenceCounter() == 0) {
 				currentEntry.remove();
 
-				for (SharedBufferEdge<K, V> edge: currentEntry.getEdges()) {
+				for (SharedBufferEdge<K, V> edge : currentEntry.getEdges()) {
 					if (edge.getTarget() != null) {
-						edge.getTarget().decreaseReferenceCounter();
 						entriesToRemove.push(edge.getTarget());
 					}
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0ea74a0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index f0a25d2..adc07b3 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -32,6 +32,7 @@ import java.util.Collection;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class SharedBufferTest extends TestLogger {
@@ -138,4 +139,31 @@ public class SharedBufferTest extends TestLogger {
 
 		assertEquals(sharedBuffer, copy);
 	}
+
+	@Test
+	public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() {
+		SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>(Event.createTypeSerializer());
+		int numberEvents = 8;
+		Event[] events = new Event[numberEvents];
+		final long timestamp = 1L;
+
+		for (int i = 0; i < numberEvents; i++) {
+			events[i] = new Event(i + 1, "e" + (i + 1), i);
+		}
+
+		sharedBuffer.put("start", events[1], timestamp, DeweyNumber.fromString("1"));
+		sharedBuffer.put("branching", events[2], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.0"));
+		sharedBuffer.put("branching", events[3], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.1"));
+		sharedBuffer.put("branching", events[3], timestamp, "branching", events[2], timestamp,
DeweyNumber.fromString("1.0.0"));
+		sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp,
DeweyNumber.fromString("1.0.0.0"));
+		sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp,
DeweyNumber.fromString("1.1.0"));
+
+		//simulate IGNORE (next event can point to events[2])
+		sharedBuffer.lock("branching", events[2], timestamp);
+
+		sharedBuffer.release("branching", events[4], timestamp);
+
+		//There should be still events[1] and events[2] in the buffer
+		assertFalse(sharedBuffer.isEmpty());
+	}
 }


Mime
View raw message