flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [7/7] flink git commit: [FLINK-4169] Fix State Handling in CEP
Date Mon, 18 Jul 2016 10:39:31 GMT
[FLINK-4169] Fix State Handling in CEP

Before, ValueState.update() was not called after changing the NFA or the
priority queue in CEP operators. This means that the operators don't
work with state backends such as RocksDB that strictly require that
update() be called when state changes.

This changes the operators to always call update() and also introduces a
test that verifies the changes.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a710fc3d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a710fc3d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a710fc3d

Branch: refs/heads/master
Commit: a710fc3d58014a7191de61fd5d6370de97c9e75e
Parents: 4e6d89c
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Thu Jul 7 16:46:29 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Jul 18 11:16:04 2016 +0200

----------------------------------------------------------------------
 flink-libraries/flink-cep/pom.xml               |   8 ++
 .../AbstractCEPBasePatternOperator.java         |   6 +
 .../operator/AbstractCEPPatternOperator.java    |  10 ++
 .../AbstractKeyedCEPPatternOperator.java        |  23 +++-
 .../flink/cep/operator/CEPOperatorTest.java     | 121 ++++++++++++++++++-
 5 files changed, 166 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a710fc3d/flink-libraries/flink-cep/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/pom.xml b/flink-libraries/flink-cep/pom.xml
index 32b26d9..5640eb8 100644
--- a/flink-libraries/flink-cep/pom.xml
+++ b/flink-libraries/flink-cep/pom.xml
@@ -83,6 +83,14 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+
     </dependencies>
     
     <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/a710fc3d/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
index 44649ac..aad408c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
@@ -58,14 +58,19 @@ public abstract class AbstractCEPBasePatternOperator<IN, OUT>
 
 	protected abstract NFA<IN> getNFA() throws IOException;
 
+	protected abstract void updateNFA(NFA<IN> nfa) throws IOException;
+
 	protected abstract PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws
IOException;
 
+	protected abstract void updatePriorityQueue(PriorityQueue<StreamRecord<IN>>
queue) throws IOException;
+
 	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		if (isProcessingTime) {
 			// there can be no out of order elements in processing time
 			NFA<IN> nfa = getNFA();
 			processEvent(nfa, element.getValue(), System.currentTimeMillis());
+			updateNFA(nfa);
 		} else {
 			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
 
@@ -77,6 +82,7 @@ public abstract class AbstractCEPBasePatternOperator<IN, OUT>
 			} else {
 				priorityQueue.offer(element);
 			}
+			updatePriorityQueue(priorityQueue);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a710fc3d/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index 2e21f67..c26eaeb 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -78,11 +78,21 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends
AbstractCEPBas
 	}
 
 	@Override
+	protected void updateNFA(NFA<IN> nfa) {
+		// a no-op, because we only have one NFA
+	}
+
+	@Override
 	protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException
{
 		return priorityQueue;
 	}
 
 	@Override
+	protected void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) {
+		// a no-op, because we only have one priority queue
+	}
+
+	@Override
 	public void processWatermark(Watermark mark) throws Exception {
 		while(!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp())
{
 			StreamRecord<IN> streamRecord = priorityQueue.poll();

http://git-wip-us.apache.org/repos/asf/flink/blob/a710fc3d/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 79ffb2b..206be47 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -132,6 +132,11 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
extends Abst
 	}
 
 	@Override
+	protected void updateNFA(NFA<IN> nfa) throws IOException {
+		nfaOperatorState.update(nfa);
+	}
+
+	@Override
 	protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException
{
 		PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueOperatorState.value();
 
@@ -145,6 +150,11 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
extends Abst
 	}
 
 	@Override
+	protected void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws
IOException {
+		priorityQueueOperatorState.update(queue);
+	}
+
+	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		keys.add(keySelector.getKey(element.getValue()));
 
@@ -158,7 +168,6 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
extends Abst
 			setKeyContext(key);
 
 			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
-
 			NFA<IN> nfa = getNFA();
 
 			while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp())
{
@@ -166,6 +175,8 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
extends Abst
 
 				processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
 			}
+			updateNFA(nfa);
+			updatePriorityQueue(priorityQueue);
 		}
 
 		output.emitWatermark(mark);
@@ -336,5 +347,15 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
extends Abst
 		public PriorityQueue<StreamRecord<T>> createPriorityQueue() {
 			return new PriorityQueue<StreamRecord<T>>(INITIAL_PRIORITY_QUEUE_CAPACITY,
new StreamRecordComparator<T>());
 		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof PriorityQueueStreamRecordFactory;
+		}
+
+		@Override
+		public int hashCode() {
+			return getClass().hashCode();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a710fc3d/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 1f53d97..56c4161 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -27,13 +27,18 @@ import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
 import static org.junit.Assert.*;
 
 import java.util.Map;
@@ -41,6 +46,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class CEPOperatorTest extends TestLogger {
 
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
 	@Test
 	public void testCEPOperatorWatermarkForwarding() throws Exception {
 		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new
OneInputStreamOperatorTestHarness<>(
@@ -186,10 +194,112 @@ public class CEPOperatorTest extends TestLogger {
 		harness.close();
 	}
 
+	@Test
+	public void testKeyedCEPOperatorCheckpointing() throws Exception {
+
+		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>()
{
+			private static final long serialVersionUID = -4873366487571254798L;
+
+			@Override
+			public Integer getKey(Event value) throws Exception {
+				return value.getId();
+			}
+		};
+
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new
OneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+						Event.createTypeSerializer(),
+						false,
+						keySelector,
+						IntSerializer.INSTANCE,
+						new NFAFactory()));
+
+		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+		harness.open();
+
+		Event startEvent = new Event(42, "start", 1.0);
+		SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+		Event endEvent=  new Event(42, "end", 1.0);
+
+		harness.processElement(new StreamRecord<Event>(startEvent, 1));
+		harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+
+		// simulate snapshot/restore with some elements in internal sorting queue
+		StreamTaskState snapshot = harness.snapshot(0, 0);
+
+		harness = new OneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+						Event.createTypeSerializer(),
+						false,
+						keySelector,
+						IntSerializer.INSTANCE,
+						new NFAFactory()));
+
+		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
+		harness.setup();
+		harness.restore(snapshot, 1);
+		harness.open();
+
+		harness.processWatermark(new Watermark(Long.MIN_VALUE));
+
+		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0),
3));
+
+		// if element timestamps are not correctly checkpointed/restored this will lead to
+		// a pruning time underflow exception in NFA
+		harness.processWatermark(new Watermark(2));
+
+		// simulate snapshot/restore with empty element queue but NFA state
+		StreamTaskState snapshot2 = harness.snapshot(1, 1);
+
+		harness = new OneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+						Event.createTypeSerializer(),
+						false,
+						keySelector,
+						IntSerializer.INSTANCE,
+						new NFAFactory()));
+
+		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
+		harness.setup();
+		harness.restore(snapshot2, 2);
+		harness.open();
+
+		harness.processElement(new StreamRecord<Event>(middleEvent, 3));
+		harness.processElement(new StreamRecord<Event>(new Event(42, "start", 1.0), 4));
+		harness.processElement(new StreamRecord<Event>(endEvent, 5));
+
+		harness.processWatermark(new Watermark(Long.MAX_VALUE));
 
+		ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+		// watermark and the result
+		assertEquals(2, result.size());
+
+		Object resultObject = result.poll();
+		assertTrue(resultObject instanceof StreamRecord);
+		StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+		assertTrue(resultRecord.getValue() instanceof Map);
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
+
+		assertEquals(startEvent, patternMap.get("start"));
+		assertEquals(middleEvent, patternMap.get("middle"));
+		assertEquals(endEvent, patternMap.get("end"));
+
+		harness.close();
+	}
 
 	@Test
-	public void testKeyedCEPOperatorCheckpointing() throws Exception {
+	public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception {
+
+		String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
+		String rocksDbBackups = tempFolder.newFolder().toURI().toString();
+		RocksDBStateBackend rocksDBStateBackend =
+				new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
+
 		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>()
{
 			private static final long serialVersionUID = -4873366487571254798L;
 
@@ -207,6 +317,7 @@ public class CEPOperatorTest extends TestLogger {
 						IntSerializer.INSTANCE,
 						new NFAFactory()));
 
+		harness.setStateBackend(rocksDBStateBackend);
 		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
 
 		harness.open();
@@ -229,6 +340,10 @@ public class CEPOperatorTest extends TestLogger {
 						IntSerializer.INSTANCE,
 						new NFAFactory()));
 
+		rocksDBStateBackend =
+				new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
+		harness.setStateBackend(rocksDBStateBackend);
 		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
 		harness.setup();
 		harness.restore(snapshot, 1);
@@ -253,6 +368,10 @@ public class CEPOperatorTest extends TestLogger {
 						IntSerializer.INSTANCE,
 						new NFAFactory()));
 
+		rocksDBStateBackend =
+				new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
+		harness.setStateBackend(rocksDBStateBackend);
 		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
 		harness.setup();
 		harness.restore(snapshot2, 2);


Mime
View raw message