flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [3/7] flink git commit: [FLINK-4149] Fix Serialization of NFA in AbstractKeyedCEPPatternOperator
Date Mon, 18 Jul 2016 10:39:27 GMT
[FLINK-4149] Fix Serialization of NFA in AbstractKeyedCEPPatternOperator

NFA is Serializable and has readObject()/writeObject() methods. In
AbstractKeyedCEPPatternOperator a KryoSerializer was used as the
TypeSerializer for the ValueState that holds NFA instances. Kryo does
not call readObject()/writeObject() therefore the state of the NFA was
invalid after deserialization.

This change adds a new TypeSerializer for NFA that uses
Java Serialization. In the long run it will be better to get rid of the
readObject()/writeObject() methods and instead efficiently serialize
using a specialized TypeSerializer.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac061467
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac061467
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac061467

Branch: refs/heads/master
Commit: ac06146719e1061aecdc7f18c95913ac9009a711
Parents: a4c9d66
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Jul 5 17:58:45 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Jul 18 11:16:03 2016 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 110 +++++++++++++++
 .../AbstractKeyedCEPPatternOperator.java        |   3 +-
 .../flink/cep/operator/CEPOperatorTest.java     | 140 ++++++++++++++++---
 3 files changed, 231 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac061467/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index f769a2b..624db0d 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -21,10 +21,16 @@ package org.apache.flink.cep.nfa;
 import com.google.common.collect.LinkedHashMultimap;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -454,4 +460,108 @@ public class NFA<T> implements Serializable {
 			return name + "_" + index;
 		}
 	}
+
+	/**
+	 * {@link TypeSerializer} for {@link NFA} that uses Java Serialization.
+	 */
+	public static class Serializer<T> extends TypeSerializer<NFA<T>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<NFA<T>> duplicate() {
+			return this;
+		}
+
+		@Override
+		public NFA<T> createInstance() {
+			return null;
+		}
+
+		@Override
+		public NFA<T> copy(NFA<T> from) {
+			try {
+				ByteArrayOutputStream baos = new ByteArrayOutputStream();
+				ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+				oos.writeObject(from);
+
+				oos.close();
+				baos.close();
+
+				byte[] data = baos.toByteArray();
+
+				ByteArrayInputStream bais = new ByteArrayInputStream(data);
+				ObjectInputStream ois = new ObjectInputStream(bais);
+
+				@SuppressWarnings("unchecked")
+				NFA<T> copy = (NFA<T>) ois.readObject();
+				return copy;
+			} catch (IOException|ClassNotFoundException e) {
+				throw new RuntimeException("Could not copy NFA.", e);
+			}
+		}
+
+		@Override
+		public NFA<T> copy(NFA<T> from, NFA<T> reuse) {
+			return copy(from);
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public void serialize(NFA<T> record, DataOutputView target) throws IOException {
+			ObjectOutputStream oos = new ObjectOutputStream(new DataOutputViewStream(target));
+			oos.writeObject(record);
+			oos.close();
+		}
+
+		@Override
+		public NFA<T> deserialize(DataInputView source) throws IOException {
+			ObjectInputStream ois = new ObjectInputStream(new DataInputViewStream(source));
+
+			try {
+				@SuppressWarnings("unchecked")
+				NFA<T> nfa = null;
+				nfa = (NFA<T>) ois.readObject();
+				return nfa;
+			} catch (ClassNotFoundException e) {
+				throw new RuntimeException("Could not deserialize NFA.", e);
+			}
+		}
+
+		@Override
+		public NFA<T> deserialize(NFA<T> reuse, DataInputView source) throws IOException
{
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			int size = source.readInt();
+			target.writeInt(size);
+			target.write(source, size);
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof Serializer && ((Serializer) obj).canEqual(this);
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof Serializer;
+		}
+
+		@Override
+		public int hashCode() {
+			return getClass().hashCode();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac061467/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 9ffe9b6..c384bb8 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.core.memory.DataInputView;
@@ -100,7 +99,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
extends Abst
 			nfaOperatorState = getPartitionedState(
 					new ValueStateDescriptor<NFA<IN>>(
 						NFA_OPERATOR_STATE_NAME,
-						new KryoSerializer<NFA<IN>>((Class<NFA<IN>>) (Class<?>)
NFA.class, getExecutionConfig()),
+						new NFA.Serializer<IN>(),
 						null));
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ac061467/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index d5ef5be..0105715 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -18,29 +18,35 @@
 
 package org.apache.flink.cep.operator;
 
+import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import static org.junit.Assert.*;
 
 import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class CEPOperatorTest extends TestLogger {
 
 	@Test
 	public void testCEPOperatorWatermarkForwarding() throws Exception {
-		OneInputStreamOperatorTestHarness<Integer, Map<String, Integer>> harness =
new OneInputStreamOperatorTestHarness<>(
-			new CEPPatternOperator<Integer>(
-				IntSerializer.INSTANCE,
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new
OneInputStreamOperatorTestHarness<>(
+			new CEPPatternOperator<>(
+				Event.createTypeSerializer(),
 				false,
-				new DummyNFAFactory<>(IntSerializer.INSTANCE))
+				new NFAFactory())
 		);
 
 		harness.open();
@@ -59,22 +65,22 @@ public class CEPOperatorTest extends TestLogger {
 
 	@Test
 	public void testKeyedCEPOperatorWatermarkForwarding() throws Exception {
-		KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>()
{
+		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>()
{
 			private static final long serialVersionUID = -4873366487571254798L;
 
 			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return value;
+			public Integer getKey(Event value) throws Exception {
+				return value.getId();
 			}
 		};
 
-		OneInputStreamOperatorTestHarness<Integer, Map<String, Integer>> harness =
new OneInputStreamOperatorTestHarness<>(
-			new KeyedCEPPatternOperator<Integer, Integer>(
-				IntSerializer.INSTANCE,
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new
OneInputStreamOperatorTestHarness<>(
+			new KeyedCEPPatternOperator<>(
+				Event.createTypeSerializer(),
 				false,
 				keySelector,
 				IntSerializer.INSTANCE,
-			new DummyNFAFactory<>(IntSerializer.INSTANCE))
+			new NFAFactory())
 		);
 
 		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
@@ -93,19 +99,113 @@ public class CEPOperatorTest extends TestLogger {
 		harness.close();
 	}
 
-	public static class DummyNFAFactory<T> implements NFACompiler.NFAFactory<T>
{
+	@Test
+	public void testKeyedCEPOperatorCheckpointing() throws Exception {
+		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>()
{
+			private static final long serialVersionUID = -4873366487571254798L;
 
-		private static final long serialVersionUID = 1173020762472766713L;
+			@Override
+			public Integer getKey(Event value) throws Exception {
+				return value.getId();
+			}
+		};
 
-		private final TypeSerializer<T> inputTypeSerializer;
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new
OneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+						Event.createTypeSerializer(),
+						false,
+						keySelector,
+						IntSerializer.INSTANCE,
+						new NFAFactory()));
 
-		public DummyNFAFactory(TypeSerializer<T> inputTypeSerializer) {
-			this.inputTypeSerializer = inputTypeSerializer;
-		}
+		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+		harness.open();
+
+		Event startEvent = new Event(42, "start", 1.0);
+		SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+		Event endEvent=  new Event(42, "end", 1.0);
+
+		harness.processElement(new StreamRecord<Event>(startEvent, 1));
+		harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0),
3));
+
+		harness.processWatermark(new Watermark(2));
+
+		// simulate snapshot/restore
+		StreamTaskState snapshot = harness.snapshot(0, 0);
+
+		harness = new OneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+						Event.createTypeSerializer(),
+						false,
+						keySelector,
+						IntSerializer.INSTANCE,
+						new NFAFactory()));
+
+		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
+		harness.setup();
+		harness.restore(snapshot, 1);
+		harness.open();
+
+		harness.processElement(new StreamRecord<Event>(middleEvent, 3));
+		harness.processElement(new StreamRecord<Event>(new Event(42, "start", 1.0), 4));
+		harness.processElement(new StreamRecord<Event>(endEvent, 5));
+
+		harness.processWatermark(new Watermark(Long.MAX_VALUE));
+
+		ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+		assertEquals(2, result.size());
+
+		Object resultObject = result.poll();
+		assertTrue(resultObject instanceof StreamRecord);
+		StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+		assertTrue(resultRecord.getValue() instanceof Map);
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
+
+		assertEquals(startEvent, patternMap.get("start"));
+		assertEquals(middleEvent, patternMap.get("middle"));
+		assertEquals(endEvent, patternMap.get("end"));
+
+		harness.close();
+	}
+
+	private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
+
+		private static final long serialVersionUID = 1173020762472766713L;
 
 		@Override
-		public NFA<T> createNFA() {
-			return new NFA<>(inputTypeSerializer.duplicate(), 0, false);
+		public NFA<Event> createNFA() {
+
+			Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>()
{
+						private static final long serialVersionUID = 5726188262756267490L;
+
+						@Override
+						public boolean filter(Event value) throws Exception {
+							return value.getName().equals("start");
+						}
+					})
+					.followedBy("middle").subtype(SubEvent.class).where(new FilterFunction<SubEvent>()
{
+						private static final long serialVersionUID = 6215754202506583964L;
+
+						@Override
+						public boolean filter(SubEvent value) throws Exception {
+							return value.getVolume() > 5.0;
+						}
+					})
+					.followedBy("end").where(new FilterFunction<Event>() {
+						private static final long serialVersionUID = 7056763917392056548L;
+
+						@Override
+						public boolean filter(Event value) throws Exception {
+							return value.getName().equals("end");
+						}
+					});
+
+			return NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 		}
 	}
 }


Mime
View raw message