flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [5/7] flink git commit: [FLINK-4162] Fix Event Queue Serialization in Abstract(Keyed)CEPPatternOperator
Date Mon, 18 Jul 2016 10:39:29 GMT
[FLINK-4162] Fix Event Queue Serialization in Abstract(Keyed)CEPPatternOperator

Before, these were using StreamRecordSerializer, which does not serialize
timestamps. Now it uses MultiplexingStreamRecordSerializer.

This also extends the tests in CEPOperatorTest to test that timestamps
are correctly checkpointed/restored.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/254379b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/254379b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/254379b7

Branch: refs/heads/master
Commit: 254379b7809447cd566bd1d1e9bf3b51aa3e9d99
Parents: ac06146
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Jul 6 16:46:32 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Jul 18 11:16:03 2016 +0200

----------------------------------------------------------------------
 .../operator/AbstractCEPPatternOperator.java    |  10 +-
 .../AbstractKeyedCEPPatternOperator.java        |  14 ++-
 .../flink/cep/operator/CEPOperatorTest.java     | 122 ++++++++++++++++++-
 3 files changed, 133 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/254379b7/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index 8150eae..2e21f67 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -25,8 +25,9 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
 import java.io.IOException;
@@ -46,7 +47,7 @@ import java.util.PriorityQueue;
 abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBasePatternOperator<IN,
OUT> {
 	private static final long serialVersionUID = 7487334510746595640L;
 
-	private final StreamRecordSerializer<IN> streamRecordSerializer;
+	private final MultiplexingStreamRecordSerializer<IN> streamRecordSerializer;
 
 	// global nfa for all elements
 	private NFA<IN> nfa;
@@ -60,7 +61,7 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends
AbstractCEPBas
 			NFACompiler.NFAFactory<IN> nfaFactory) {
 		super(inputSerializer, isProcessingTime);
 
-		this.streamRecordSerializer = new StreamRecordSerializer<>(inputSerializer);
+		this.streamRecordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
 		this.nfa = nfaFactory.createNFA();
 	}
 
@@ -134,7 +135,8 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends
AbstractCEPBas
 		priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries,
new StreamRecordComparator<IN>());
 
 		for (int i = 0; i <numberPriorityQueueEntries; i++) {
-			priorityQueue.offer(streamRecordSerializer.deserialize(div));
+			StreamElement streamElement = streamRecordSerializer.deserialize(div);
+			priorityQueue.offer(streamElement.<IN>asRecord());
 		}
 
 		div.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/254379b7/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index c384bb8..79ffb2b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -29,8 +29,8 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
 import java.io.IOException;
@@ -103,13 +103,17 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
extends Abst
 						null));
 		}
 
+		@SuppressWarnings("unchecked,rawtypes")
+		TypeSerializer<StreamRecord<IN>> streamRecordSerializer =
+				(TypeSerializer) new MultiplexingStreamRecordSerializer<>(getInputSerializer());
+
 		if (priorityQueueOperatorState == null) {
 			priorityQueueOperatorState = getPartitionedState(
-					new ValueStateDescriptor<PriorityQueue<StreamRecord<IN>>>(
+					new ValueStateDescriptor<>(
 						PRIORIRY_QUEUE_STATE_NAME,
-						new PriorityQueueSerializer<StreamRecord<IN>>(
-							new StreamRecordSerializer<IN>(getInputSerializer()),
-							new PriorityQueueStreamRecordFactory<IN>()),
+						new PriorityQueueSerializer<>(
+								streamRecordSerializer,
+								new PriorityQueueStreamRecordFactory<IN>()),
 						null));
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/254379b7/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 0105715..1f53d97 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -100,6 +101,94 @@ public class CEPOperatorTest extends TestLogger {
 	}
 
 	@Test
+	public void testCEPOperatorCheckpointing() throws Exception {
+		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>()
{
+			private static final long serialVersionUID = -4873366487571254798L;
+
+			@Override
+			public Integer getKey(Event value) throws Exception {
+				return value.getId();
+			}
+		};
+
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new
OneInputStreamOperatorTestHarness<>(
+				new CEPPatternOperator<>(
+						Event.createTypeSerializer(),
+						false,
+						new NFAFactory()));
+
+		harness.open();
+
+		Event startEvent = new Event(42, "start", 1.0);
+		SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+		Event endEvent=  new Event(42, "end", 1.0);
+
+		harness.processElement(new StreamRecord<Event>(startEvent, 1));
+		harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+
+		// simulate snapshot/restore with some elements in internal sorting queue
+		StreamTaskState snapshot = harness.snapshot(0, 0);
+
+		harness = new OneInputStreamOperatorTestHarness<>(
+				new CEPPatternOperator<>(
+						Event.createTypeSerializer(),
+						false,
+						new NFAFactory()));
+
+		harness.setup();
+		harness.restore(snapshot, 1);
+		harness.open();
+
+		harness.processWatermark(new Watermark(Long.MIN_VALUE));
+
+		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0),
3));
+
+		// if element timestamps are not correctly checkpointed/restored this will lead to
+		// a pruning time underflow exception in NFA
+		harness.processWatermark(new Watermark(2));
+
+		// simulate snapshot/restore with empty element queue but NFA state
+		StreamTaskState snapshot2 = harness.snapshot(1, 1);
+
+		harness = new OneInputStreamOperatorTestHarness<>(
+				new CEPPatternOperator<>(
+						Event.createTypeSerializer(),
+						false,
+						new NFAFactory()));
+
+		harness.setup();
+		harness.restore(snapshot2, 2);
+		harness.open();
+
+		harness.processElement(new StreamRecord<Event>(middleEvent, 3));
+		harness.processElement(new StreamRecord<Event>(new Event(42, "start", 1.0), 4));
+		harness.processElement(new StreamRecord<Event>(endEvent, 5));
+
+		harness.processWatermark(new Watermark(Long.MAX_VALUE));
+
+		ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+		// watermark and the result
+		assertEquals(2, result.size());
+
+		Object resultObject = result.poll();
+		assertTrue(resultObject instanceof StreamRecord);
+		StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+		assertTrue(resultRecord.getValue() instanceof Map);
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
+
+		assertEquals(startEvent, patternMap.get("start"));
+		assertEquals(middleEvent, patternMap.get("middle"));
+		assertEquals(endEvent, patternMap.get("end"));
+
+		harness.close();
+	}
+
+
+
+	@Test
 	public void testKeyedCEPOperatorCheckpointing() throws Exception {
 		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>()
{
 			private static final long serialVersionUID = -4873366487571254798L;
@@ -128,12 +217,33 @@ public class CEPOperatorTest extends TestLogger {
 
 		harness.processElement(new StreamRecord<Event>(startEvent, 1));
 		harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+
+		// simulate snapshot/restore with some elements in internal sorting queue
+		StreamTaskState snapshot = harness.snapshot(0, 0);
+
+		harness = new OneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+						Event.createTypeSerializer(),
+						false,
+						keySelector,
+						IntSerializer.INSTANCE,
+						new NFAFactory()));
+
+		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
+		harness.setup();
+		harness.restore(snapshot, 1);
+		harness.open();
+
+		harness.processWatermark(new Watermark(Long.MIN_VALUE));
+
 		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0),
3));
 
+		// if element timestamps are not correctly checkpointed/restored this will lead to
+		// a pruning time underflow exception in NFA
 		harness.processWatermark(new Watermark(2));
 
-		// simulate snapshot/restore
-		StreamTaskState snapshot = harness.snapshot(0, 0);
+		// simulate snapshot/restore with empty element queue but NFA state
+		StreamTaskState snapshot2 = harness.snapshot(1, 1);
 
 		harness = new OneInputStreamOperatorTestHarness<>(
 				new KeyedCEPPatternOperator<>(
@@ -145,7 +255,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
 		harness.setup();
-		harness.restore(snapshot, 1);
+		harness.restore(snapshot2, 2);
 		harness.open();
 
 		harness.processElement(new StreamRecord<Event>(middleEvent, 3));
@@ -156,6 +266,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		ConcurrentLinkedQueue<Object> result = harness.getOutput();
 
+		// watermark and the result
 		assertEquals(2, result.size());
 
 		Object resultObject = result.poll();
@@ -203,7 +314,10 @@ public class CEPOperatorTest extends TestLogger {
 						public boolean filter(Event value) throws Exception {
 							return value.getName().equals("end");
 						}
-					});
+					})
+					// add a window timeout to test whether timestamps of elements in the
+					// priority queue in CEP operator are correctly checkpointed/restored
+					.within(Time.milliseconds(10));
 
 			return NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 		}


Mime
View raw message