flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [8/9] flink git commit: [FLINK-6604] [cep] Remove java serialization from the library.
Date Wed, 17 May 2017 12:38:54 GMT
[FLINK-6604] [cep] Remove java serialization from the library.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a54d05e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a54d05e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a54d05e

Branch: refs/heads/master
Commit: 7a54d05ecd33b6dc140a7146f0efa90d64471f47
Parents: f7ebcb0
Author: kkloudas <kkloudas@gmail.com>
Authored: Tue May 16 17:07:29 2017 +0200
Committer: kkloudas <kkloudas@gmail.com>
Committed: Wed May 17 14:37:34 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/cep/nfa/DeweyNumber.java   |  98 ++-
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 517 ++++++++++++++--
 .../org/apache/flink/cep/nfa/SharedBuffer.java  | 596 ++++++++++++++-----
 .../java/org/apache/flink/cep/nfa/State.java    |   2 +
 .../flink/cep/nfa/compiler/NFACompiler.java     |  81 ++-
 .../AbstractKeyedCEPPatternOperator.java        |  16 +-
 .../java/org/apache/flink/cep/nfa/NFATest.java  | 182 ++++--
 .../apache/flink/cep/nfa/SharedBufferTest.java  |  14 +-
 .../cep/operator/CEPFrom12MigrationTest.java    |  99 ++-
 .../cep/operator/CEPMigration11to13Test.java    | 102 +++-
 .../flink/cep/operator/CEPOperatorTest.java     | 110 +++-
 11 files changed, 1499 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index fd3fafa..3827956 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -18,6 +18,13 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 
@@ -40,14 +47,14 @@ public class DeweyNumber implements Serializable {
 		deweyNumber = new int[]{start};
 	}
 
-	protected DeweyNumber(int[] deweyNumber) {
-		this.deweyNumber = deweyNumber;
-	}
-
 	public DeweyNumber(DeweyNumber number) {
 		this.deweyNumber = Arrays.copyOf(number.deweyNumber, number.deweyNumber.length);
 	}
 
+	private DeweyNumber(int[] deweyNumber) {
+		this.deweyNumber = deweyNumber;
+	}
+
 	/**
 	 * Checks whether this dewey number is compatible to the other dewey number.
 	 *
@@ -175,4 +182,87 @@ public class DeweyNumber implements Serializable {
 			return new DeweyNumber(deweyNumber);
 		}
 	}
+
+	/**
+	 * A {@link TypeSerializer} for the {@link DeweyNumber} which serves as a version number.
+	 */
+	public static class DeweyNumberSerializer extends TypeSerializerSingleton<DeweyNumber> {
+
+		private static final long serialVersionUID = -5086792497034943656L;
+
+		private final IntSerializer elemSerializer = IntSerializer.INSTANCE;
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public DeweyNumber createInstance() {
+			return new DeweyNumber(1);
+		}
+
+		@Override
+		public DeweyNumber copy(DeweyNumber from) {
+			return new DeweyNumber(from);
+		}
+
+		@Override
+		public DeweyNumber copy(DeweyNumber from, DeweyNumber reuse) {
+			return copy(from);
+		}
+
+		@Override
+		public int getLength() {
+			return -1;
+		}
+
+		@Override
+		public void serialize(DeweyNumber record, DataOutputView target) throws IOException {
+			final int size = record.length();
+			target.writeInt(size);
+			for (int i = 0; i < size; i++) {
+				elemSerializer.serialize(record.deweyNumber[i], target);
+			}
+		}
+
+		@Override
+		public DeweyNumber deserialize(DataInputView source) throws IOException {
+			final int size = source.readInt();
+			int[] number = new int[size];
+			for (int i = 0; i < size; i++) {
+				number[i] = elemSerializer.deserialize(source);
+			}
+			return new DeweyNumber(number);
+		}
+
+		@Override
+		public DeweyNumber deserialize(DeweyNumber reuse, DataInputView source) throws IOException {
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			final int size = source.readInt();
+			target.writeInt(size);
+			for (int i = 0; i < size; i++) {
+				elemSerializer.copy(source, target);
+			}
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj == this || obj.getClass().equals(getClass());
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return true;
+		}
+
+		@Override
+		public int hashCode() {
+			return elemSerializer.hashCode();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index f2ade9e..ab5cd8e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -21,11 +21,17 @@ package org.apache.flink.cep.nfa;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.ListMultimap;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.EnumSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
@@ -86,19 +92,35 @@ public class NFA<T> implements Serializable {
 
 	private static final long serialVersionUID = 2957674889294717265L;
 
-	private final NonDuplicatingTypeSerializer<T> nonDuplicatingTypeSerializer;
+	/////////////////////			Backwards Compatibility Fields			/////////////////////
 
 	/**
-	 * Used only for backwards compatibility. Buffer used to store the matched events.
+	 * @deprecated Used only for backwards compatibility.
+	 * Look at the {@link #eventSharedBuffer} for its replacement.
 	 */
+	@Deprecated
 	private final SharedBuffer<State<T>, T> sharedBuffer = null;
 
 	/**
+	 * @deprecated Used only for backward compatibility.
+	 */
+	@Deprecated
+	private int startEventCounter;
+
+	/**
+	 * @deprecated Used only for backwards compatibility.
+	 */
+	@Deprecated
+	private final NonDuplicatingTypeSerializer<T> nonDuplicatingTypeSerializer;
+
+	//////////////////			End of Backwards Compatibility Fields			//////////////////
+
+	/**
 	 * A set of all the valid NFA states, as returned by the
 	 * {@link NFACompiler NFACompiler}.
 	 * These are directly derived from the user-specified pattern.
 	 */
-	private final Set<State<T>> states;
+	private Set<State<T>> states;
 
 	/**
 	 * The length of a windowed pattern, as specified using the
@@ -114,11 +136,6 @@ public class NFA<T> implements Serializable {
 	private final boolean handleTimeout;
 
 	/**
-	 * Used only for backward compatibility.
-	 */
-	private int startEventCounter;
-
-	/**
 	 * Current set of {@link ComputationState computation states} within the state machine.
 	 * These are the "active" intermediate states that are waiting for new matching
 	 * events to transition to new valid states.
@@ -126,19 +143,22 @@ public class NFA<T> implements Serializable {
 	private transient Queue<ComputationState<T>> computationStates;
 
 	/**
-	 * 	Buffer used to store the matched events.
+	 * Buffer used to store the matched events.
 	 */
-	private final SharedBuffer<String, T> stringSharedBuffer;
+	private SharedBuffer<String, T> eventSharedBuffer;
+
+	private TypeSerializer<T> eventSerializer;
 
 	public NFA(
 			final TypeSerializer<T> eventSerializer,
 			final long windowTime,
 			final boolean handleTimeout) {
 
+		this.eventSerializer = eventSerializer;
 		this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer);
 		this.windowTime = windowTime;
 		this.handleTimeout = handleTimeout;
-		this.stringSharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
+		this.eventSharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
 		this.computationStates = new LinkedList<>();
 		this.states = new HashSet<>();
 	}
@@ -169,7 +189,7 @@ public class NFA<T> implements Serializable {
 	 * {@code false} otherwise.
 	 */
 	public boolean isEmpty() {
-		return stringSharedBuffer.isEmpty();
+		return eventSharedBuffer.isEmpty();
 	}
 
 	/**
@@ -207,7 +227,7 @@ public class NFA<T> implements Serializable {
 					timeoutResult.add(Tuple2.of(timedoutPattern, timestamp));
 				}
 
-				stringSharedBuffer.release(
+				eventSharedBuffer.release(
 						computationState.getPreviousState().getName(),
 						computationState.getEvent(),
 						computationState.getTimestamp(),
@@ -231,7 +251,7 @@ public class NFA<T> implements Serializable {
 					result.add(matchedPattern);
 
 					// remove found patterns because they are no longer needed
-					stringSharedBuffer.release(
+					eventSharedBuffer.release(
 							newComputationState.getPreviousState().getName(),
 							newComputationState.getEvent(),
 							newComputationState.getTimestamp(),
@@ -239,7 +259,7 @@ public class NFA<T> implements Serializable {
 				} else if (newComputationState.isStopState()) {
 					//reached stop state. release entry for the stop state
 					shouldDiscardPath = true;
-					stringSharedBuffer.release(
+					eventSharedBuffer.release(
 						newComputationState.getPreviousState().getName(),
 						newComputationState.getEvent(),
 						newComputationState.getTimestamp(),
@@ -254,7 +274,7 @@ public class NFA<T> implements Serializable {
 				// a stop state was reached in this branch. release branch which results in removing previous event from
 				// the buffer
 				for (final ComputationState<T> state : statesToRetain) {
-					stringSharedBuffer.release(
+					eventSharedBuffer.release(
 						state.getPreviousState().getName(),
 						state.getEvent(),
 						state.getTimestamp(),
@@ -275,7 +295,7 @@ public class NFA<T> implements Serializable {
 
 				// remove all elements which are expired
 				// with respect to the window length
-				stringSharedBuffer.prune(pruningTimestamp);
+				eventSharedBuffer.prune(pruningTimestamp);
 			}
 		}
 
@@ -289,7 +309,7 @@ public class NFA<T> implements Serializable {
 			NFA<T> other = (NFA<T>) obj;
 
 			return nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
-				stringSharedBuffer.equals(other.stringSharedBuffer) &&
+				eventSharedBuffer.equals(other.eventSharedBuffer) &&
 				states.equals(other.states) &&
 				windowTime == other.windowTime;
 		} else {
@@ -299,7 +319,7 @@ public class NFA<T> implements Serializable {
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(nonDuplicatingTypeSerializer, stringSharedBuffer, states, windowTime);
+		return Objects.hash(nonDuplicatingTypeSerializer, eventSharedBuffer, states, windowTime);
 	}
 
 	private static <T> boolean isEquivalentState(final State<T> s1, final State<T> s2) {
@@ -445,14 +465,14 @@ public class NFA<T> implements Serializable {
 					final long startTimestamp;
 					if (computationState.isStartState()) {
 						startTimestamp = timestamp;
-						counter = stringSharedBuffer.put(
+						counter = eventSharedBuffer.put(
 							currentState.getName(),
 							event,
 							timestamp,
 							currentVersion);
 					} else {
 						startTimestamp = computationState.getStartTimestamp();
-						counter = stringSharedBuffer.put(
+						counter = eventSharedBuffer.put(
 							currentState.getName(),
 							event,
 							timestamp,
@@ -502,7 +522,7 @@ public class NFA<T> implements Serializable {
 
 		if (computationState.getEvent() != null) {
 			// release the shared entry referenced by the current computation state.
-			stringSharedBuffer.release(
+			eventSharedBuffer.release(
 				computationState.getPreviousState().getName(),
 				computationState.getEvent(),
 				computationState.getTimestamp(),
@@ -524,7 +544,7 @@ public class NFA<T> implements Serializable {
 		ComputationState<T> computationState = ComputationState.createState(
 				this, currentState, previousState, event, counter, timestamp, version, startTimestamp);
 		computationStates.add(computationState);
-		stringSharedBuffer.lock(previousState.getName(), event, timestamp, counter);
+		eventSharedBuffer.lock(previousState.getName(), event, timestamp, counter);
 	}
 
 	private State<T> findFinalStateAfterProceed(State<T> state, T event, ComputationState<T> computationState) {
@@ -609,7 +629,12 @@ public class NFA<T> implements Serializable {
 			return new HashMap<>();
 		}
 
-		Collection<ListMultimap<String, T>> paths = stringSharedBuffer.extractPatterns(
+		// the following is used when migrating from previous versions.
+		if (eventSerializer == null) {
+			eventSerializer = nonDuplicatingTypeSerializer.getTypeSerializer();
+		}
+
+		Collection<ListMultimap<String, T>> paths = eventSharedBuffer.extractPatterns(
 				computationState.getPreviousState().getName(),
 				computationState.getEvent(),
 				computationState.getTimestamp(),
@@ -619,19 +644,22 @@ public class NFA<T> implements Serializable {
 		// for a given computation state, we cannot have more than one matching patterns.
 		Preconditions.checkState(paths.size() <= 1);
 
-		TypeSerializer<T> serializer = nonDuplicatingTypeSerializer.getTypeSerializer();
-
 		Map<String, List<T>> result = new HashMap<>();
 		for (ListMultimap<String, T> path: paths) {
 			for (String key: path.keySet()) {
 				List<T> events = path.get(key);
 
-				List<T> values = new ArrayList<>(events.size());
+				String originalKey = NFACompiler.getOriginalStateNameFromInternal(key);
+				List<T> values = result.get(originalKey);
+				if (values == null) {
+					values = new ArrayList<>(events.size());
+				}
+
 				for (T event: events) {
 					// copy the element so that the user can change it
-					values.add(serializer.isImmutableType() ? event : serializer.copy(event));
+					values.add(eventSerializer.isImmutableType() ? event : eventSerializer.copy(event));
 				}
-				result.put(key, values);
+				result.put(originalKey, values);
 			}
 		}
 		return result;
@@ -639,18 +667,6 @@ public class NFA<T> implements Serializable {
 
 	//////////////////////			Fault-Tolerance / Migration			//////////////////////
 
-	private void writeObject(ObjectOutputStream oos) throws IOException {
-		oos.defaultWriteObject();
-
-		oos.writeInt(computationStates.size());
-
-		for(ComputationState<T> computationState: computationStates) {
-			writeComputationState(computationState, oos);
-		}
-
-		nonDuplicatingTypeSerializer.clearReferences();
-	}
-
 	private final static String BEGINNING_STATE_NAME = "$beginningState$";
 
 	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
@@ -676,7 +692,7 @@ public class NFA<T> implements Serializable {
 			try {
 				//Backwards compatibility
 				this.computationStates.addAll(migrateNFA(readComputationStates));
-				final Field newSharedBufferField = NFA.class.getDeclaredField("stringSharedBuffer");
+				final Field newSharedBufferField = NFA.class.getDeclaredField("eventSharedBuffer");
 				final Field sharedBufferField = NFA.class.getDeclaredField("sharedBuffer");
 				sharedBufferField.setAccessible(true);
 				newSharedBufferField.setAccessible(true);
@@ -760,24 +776,6 @@ public class NFA<T> implements Serializable {
 		return computationStates;
 	}
 
-	private void writeComputationState(final ComputationState<T> computationState, final ObjectOutputStream oos) throws IOException {
-		oos.writeObject(computationState.getState());
-		oos.writeObject(computationState.getPreviousState());
-		oos.writeLong(computationState.getTimestamp());
-		oos.writeObject(computationState.getVersion());
-		oos.writeLong(computationState.getStartTimestamp());
-
-		if (computationState.getEvent() == null) {
-			// write that we don't have an event associated
-			oos.writeBoolean(false);
-		} else {
-			// write that we have an event associated
-			oos.writeBoolean(true);
-			DataOutputViewStreamWrapper output = new DataOutputViewStreamWrapper(oos);
-			nonDuplicatingTypeSerializer.serialize(computationState.getEvent(), output);
-		}
-	}
-
 	@SuppressWarnings("unchecked")
 	private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException {
 		final State<T> state = (State<T>)ois.readObject();
@@ -805,7 +803,397 @@ public class NFA<T> implements Serializable {
 		return ComputationState.createState(this, state, previousState, event, 0, timestamp, version, startTimestamp);
 	}
 
-	//////////////////////			Serialization			//////////////////////
+	//////////////////////			New Serialization			//////////////////////
+
+	/**
+	 * The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state.
+	 */
+	public static final class NFASerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
+
+		private static final int VERSION = 1;
+
+		/** This empty constructor is required for deserializing the configuration. */
+		public NFASerializerConfigSnapshot() {}
+
+		public NFASerializerConfigSnapshot(
+				TypeSerializerConfigSnapshot sharedBufferSerializerConfigSnapshot,
+				TypeSerializerConfigSnapshot eventSerializerConfigSnapshot) {
+
+			super(sharedBufferSerializerConfigSnapshot, eventSerializerConfigSnapshot);
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+	}
+
+	/**
+	 * A {@link TypeSerializer} for {@link NFA} that uses Java Serialization.
+	 */
+	public static class NFASerializer<T> extends TypeSerializer<NFA<T>> {
+
+		private static final long serialVersionUID = 2098282423980597010L;
+
+		private final TypeSerializer<SharedBuffer<String, T>> sharedBufferSerializer;
+
+		private final TypeSerializer<T> eventSerializer;
+
+		public NFASerializer(TypeSerializer<T> typeSerializer) {
+			this(typeSerializer, new SharedBuffer.SharedBufferSerializer<>(StringSerializer.INSTANCE, typeSerializer));
+		}
+
+		public NFASerializer(
+				TypeSerializer<T> typeSerializer,
+				TypeSerializer<SharedBuffer<String, T>> sharedBufferSerializer) {
+			this.eventSerializer = typeSerializer;
+			this.sharedBufferSerializer = sharedBufferSerializer;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<NFA<T>> duplicate() {
+			return this;
+		}
+
+		@Override
+		public NFA<T> createInstance() {
+			return null;
+		}
+
+		private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
+			ois.defaultReadObject();
+		}
+
+		@Override
+		public NFA<T> copy(NFA<T> from) {
+			try {
+				ByteArrayOutputStream baos = new ByteArrayOutputStream();
+				ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+				serialize(from, new DataOutputViewStreamWrapper(oos));
+
+				oos.close();
+				baos.close();
+
+				byte[] data = baos.toByteArray();
+
+				ByteArrayInputStream bais = new ByteArrayInputStream(data);
+				ObjectInputStream ois = new ObjectInputStream(bais);
+
+				@SuppressWarnings("unchecked")
+				NFA<T> copy = deserialize(new DataInputViewStreamWrapper(ois));
+				ois.close();
+				bais.close();
+				return copy;
+			} catch (IOException e) {
+				throw new RuntimeException("Could not copy NFA.", e);
+			}
+		}
+
+		@Override
+		public NFA<T> copy(NFA<T> from, NFA<T> reuse) {
+			return copy(from);
+		}
+
+		@Override
+		public int getLength() {
+			return -1;
+		}
+
+		@Override
+		public void serialize(NFA<T> record, DataOutputView target) throws IOException {
+			serializeStates(record.states, target);
+			target.writeLong(record.windowTime);
+			target.writeBoolean(record.handleTimeout);
+			
+			sharedBufferSerializer.serialize(record.eventSharedBuffer, target);
+
+			target.writeInt(record.computationStates.size());
+
+			StringSerializer stateNameSerializer = StringSerializer.INSTANCE;
+			LongSerializer timestampSerializer = LongSerializer.INSTANCE;
+			DeweyNumber.DeweyNumberSerializer versionSerializer = new DeweyNumber.DeweyNumberSerializer();
+
+			for (ComputationState<T> computationState: record.computationStates) {
+				stateNameSerializer.serialize(computationState.getState().getName(), target);
+				stateNameSerializer.serialize(computationState.getPreviousState() == null
+						? null : computationState.getPreviousState().getName(), target);
+
+				timestampSerializer.serialize(computationState.getTimestamp(), target);
+				versionSerializer.serialize(computationState.getVersion(), target);
+				timestampSerializer.serialize(computationState.getStartTimestamp(), target);
+
+				if (computationState.getEvent() == null) {
+					target.writeBoolean(false);
+				} else {
+					target.writeBoolean(true);
+					eventSerializer.serialize(computationState.getEvent(), target);
+				}
+			}
+		}
+
+		@Override
+		public NFA<T> deserialize(DataInputView source) throws IOException {
+			Set<State<T>> states = deserializeStates(source);
+			long windowTime = source.readLong();
+			boolean handleTimeout = source.readBoolean();
+			
+			NFA<T> nfa = new NFA<>(eventSerializer, windowTime, handleTimeout);
+			nfa.states = states;
+			
+			nfa.eventSharedBuffer = sharedBufferSerializer.deserialize(source);
+
+			Queue<ComputationState<T>> computationStates = new LinkedList<>();
+			StringSerializer stateNameSerializer = StringSerializer.INSTANCE;
+			LongSerializer timestampSerializer = LongSerializer.INSTANCE;
+			DeweyNumber.DeweyNumberSerializer versionSerializer = new DeweyNumber.DeweyNumberSerializer();
+
+			int computationStateNo = source.readInt();
+			for (int i = 0; i < computationStateNo; i++) {
+				State<T> state = getStateByName(stateNameSerializer.deserialize(source), nfa);
+				State<T> prevState = getStateByName(stateNameSerializer.deserialize(source), nfa);
+				long timestamp = timestampSerializer.deserialize(source);
+				DeweyNumber version = versionSerializer.deserialize(source);
+				long startTimestamp = timestampSerializer.deserialize(source);
+
+				T event = null;
+				if (source.readBoolean()) {
+					event = eventSerializer.deserialize(source);
+				}
+
+				computationStates.add(ComputationState.createState(
+						nfa, state, prevState, event, 0, timestamp, version, startTimestamp));
+			}
+
+			nfa.computationStates = computationStates;
+			return nfa;
+		}
+
+		private State<T> getStateByName(String name, NFA<T> nfa) {
+			for (State<T> state: nfa.states) {
+				if (state.getName().equals(name)) {
+					return state;
+				}
+			}
+			return null;
+		}
+
+		@Override
+		public NFA<T> deserialize(NFA<T> reuse, DataInputView source) throws IOException {
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			Set<State<T>> states = deserializeStates(source);
+			serializeStates(states, target);
+
+			long windowTime = source.readLong();
+			target.writeLong(windowTime);
+
+			boolean handleTimeout = source.readBoolean();
+			target.writeBoolean(handleTimeout);
+
+			SharedBuffer<String, T> sharedBuffer = sharedBufferSerializer.deserialize(source);
+			sharedBufferSerializer.serialize(sharedBuffer, target);
+
+			StringSerializer stateNameSerializer = StringSerializer.INSTANCE;
+			LongSerializer timestampSerializer = LongSerializer.INSTANCE;
+			DeweyNumber.DeweyNumberSerializer versionSerializer = new DeweyNumber.DeweyNumberSerializer();
+
+			int computationStateNo = source.readInt();
+			target.writeInt(computationStateNo);
+
+			for (int i = 0; i < computationStateNo; i++) {
+				String stateName = stateNameSerializer.deserialize(source);
+				stateNameSerializer.serialize(stateName, target);
+
+				String prevStateName = stateNameSerializer.deserialize(source);
+				stateNameSerializer.serialize(prevStateName, target);
+
+				long timestamp = timestampSerializer.deserialize(source);
+				timestampSerializer.serialize(timestamp, target);
+
+				DeweyNumber version = versionSerializer.deserialize(source);
+				versionSerializer.serialize(version, target);
+
+				long startTimestamp = timestampSerializer.deserialize(source);
+				timestampSerializer.serialize(startTimestamp, target);
+
+				boolean hasEvent = source.readBoolean();
+				target.writeBoolean(hasEvent);
+				if (hasEvent) {
+					T event = eventSerializer.deserialize(source);
+					eventSerializer.serialize(event, target);
+				}
+			}
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj == this ||
+					(obj != null && obj.getClass().equals(getClass()) &&
+							sharedBufferSerializer.equals(((NFASerializer) obj).sharedBufferSerializer) &&
+							eventSerializer.equals(((NFASerializer) obj).eventSerializer));
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return true;
+		}
+
+		@Override
+		public int hashCode() {
+			return 37 * sharedBufferSerializer.hashCode() + eventSerializer.hashCode();
+		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			return new NFASerializerConfigSnapshot(
+					eventSerializer.snapshotConfiguration(),
+					sharedBufferSerializer.snapshotConfiguration()
+			);
+		}
+		public CompatibilityResult<NFA<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			if (configSnapshot instanceof NFASerializerConfigSnapshot) {
+				TypeSerializerConfigSnapshot[] serializerConfigSnapshots =
+						((NFASerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots();
+
+				CompatibilityResult<T> elementCompatResult =
+						eventSerializer.ensureCompatibility(serializerConfigSnapshots[0]);
+				CompatibilityResult<SharedBuffer<String, T>> sharedBufCompatResult =
+						sharedBufferSerializer.ensureCompatibility(serializerConfigSnapshots[1]);
+
+				if (!sharedBufCompatResult.isRequiresMigration() && !elementCompatResult.isRequiresMigration()) {
+					return CompatibilityResult.compatible();
+				} else {
+					if (elementCompatResult.getConvertDeserializer() != null &&
+							sharedBufCompatResult.getConvertDeserializer() != null) {
+						return CompatibilityResult.requiresMigration(
+								new NFASerializer<>(
+										new TypeDeserializerAdapter<>(elementCompatResult.getConvertDeserializer()),
+										new TypeDeserializerAdapter<>(sharedBufCompatResult.getConvertDeserializer())));
+					}
+				}
+			}
+
+			return CompatibilityResult.requiresMigration(null);
+		}
+
+		private void serializeStates(Set<State<T>> states, DataOutputView out) throws IOException {
+			TypeSerializer<String> nameSerializer = StringSerializer.INSTANCE;
+			TypeSerializer<State.StateType> stateTypeSerializer = new EnumSerializer<>(State.StateType.class);
+			TypeSerializer<StateTransitionAction> actionSerializer = new EnumSerializer<>(StateTransitionAction.class);
+
+			out.writeInt(states.size());
+			for (State<T> state: states) {
+				nameSerializer.serialize(state.getName(), out);
+				stateTypeSerializer.serialize(state.getStateType(), out);
+			}
+
+			for (State<T> state: states) {
+				nameSerializer.serialize(state.getName(), out);
+
+				out.writeInt(state.getStateTransitions().size());
+				for (StateTransition<T> transition : state.getStateTransitions()) {
+					nameSerializer.serialize(transition.getSourceState().getName(), out);
+					nameSerializer.serialize(transition.getTargetState().getName(), out);
+					actionSerializer.serialize(transition.getAction(), out);
+
+					serializeCondition(transition.getCondition(), out);
+				}
+			}
+		}
+
+		private Set<State<T>> deserializeStates(DataInputView in) throws IOException {
+			TypeSerializer<String> nameSerializer = StringSerializer.INSTANCE;
+			TypeSerializer<State.StateType> stateTypeSerializer = new EnumSerializer<>(State.StateType.class);
+			TypeSerializer<StateTransitionAction> actionSerializer = new EnumSerializer<>(StateTransitionAction.class);
+
+
+			final int noOfStates = in.readInt();
+			Map<String, State<T>> states = new HashMap<>(noOfStates);
+
+			for (int i = 0; i < noOfStates; i++) {
+				String stateName = nameSerializer.deserialize(in);
+				State.StateType stateType = stateTypeSerializer.deserialize(in);
+
+				State<T> state = new State<>(stateName, stateType);
+				states.put(stateName, state);
+			}
+
+			for (int i = 0; i < noOfStates; i++) {
+				String srcName = nameSerializer.deserialize(in);
+
+				int noOfTransitions = in.readInt();
+				for (int j = 0; j < noOfTransitions; j++) {
+					String src = nameSerializer.deserialize(in);
+					Preconditions.checkState(src.equals(srcName),
+							"Source Edge names do not match (" + srcName + " - " + src + ").");
+
+					String trgt = nameSerializer.deserialize(in);
+					StateTransitionAction action = actionSerializer.deserialize(in);
+
+					IterativeCondition<T> condition = null;
+					try {
+						condition = deserializeCondition(in);
+					} catch (ClassNotFoundException e) {
+						e.printStackTrace();
+					}
+
+					State<T> srcState = states.get(src);
+					State<T> trgtState = states.get(trgt);
+					srcState.addStateTransition(action, trgtState, condition);
+				}
+
+			}
+			return new HashSet<>(states.values());
+		}
+
+		private void serializeCondition(IterativeCondition<T> condition, DataOutputView out) throws IOException {
+			out.writeBoolean(condition != null);
+			if (condition != null) {
+				ByteArrayOutputStream baos = new ByteArrayOutputStream();
+				ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+				oos.writeObject(condition);
+
+				oos.close();
+				baos.close();
+
+				byte[] serCondition = baos.toByteArray();
+				out.writeInt(serCondition.length);
+				out.write(serCondition);
+			}
+		}
+
+		private IterativeCondition<T> deserializeCondition(DataInputView in) throws IOException, ClassNotFoundException {
+			boolean hasCondition = in.readBoolean();
+			if (hasCondition) {
+				int length = in.readInt();
+
+				byte[] serCondition = new byte[length];
+				in.read(serCondition);
+
+				ByteArrayInputStream bais = new ByteArrayInputStream(serCondition);
+				ObjectInputStream ois = new ObjectInputStream(bais);
+
+				IterativeCondition<T> condition = (IterativeCondition<T>) ois.readObject();
+				ois.close();
+				bais.close();
+
+				return condition;
+			}
+			return null;
+		}
+	}
+
+	//////////////////			Old Serialization			//////////////////////
 
 	/**
 	 * A {@link TypeSerializer} for {@link NFA} that uses Java Serialization.
@@ -862,10 +1250,7 @@ public class NFA<T> implements Serializable {
 
 		@Override
 		public void serialize(NFA<T> record, DataOutputView target) throws IOException {
-			try (ObjectOutputStream oos = new ObjectOutputStream(new DataOutputViewStream(target))) {
-				oos.writeObject(record);
-				oos.flush();
-			}
+			throw new UnsupportedOperationException("This is the deprecated serialization strategy.");
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index dcf5665..ab134d0 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -22,10 +22,20 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.cep.NonDuplicatingTypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -35,6 +45,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -63,6 +74,10 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 
 	private static final long serialVersionUID = 9213251042562206495L;
 
+	/**
+	 * @deprecated This serializer is only used for backwards compatibility.
+	 */
+	@Deprecated
 	private final TypeSerializer<V> valueSerializer;
 
 	private transient Map<K, SharedBufferPage<K, V>> pages;
@@ -72,6 +87,12 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 		this.pages = new HashMap<>();
 	}
 
+	public TypeSerializer<V> getValueSerializer() {
+		return (valueSerializer instanceof NonDuplicatingTypeSerializer)
+				? ((NonDuplicatingTypeSerializer) valueSerializer).getTypeSerializer()
+				: valueSerializer;
+	}
+
 	/**
 	 * Stores given value (value + timestamp) under the given key. It assigns a preceding element
 	 * relation to the entry which is defined by the previous key, value (value + timestamp).
@@ -293,155 +314,6 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 		}
 	}
 
-	private void writeObject(ObjectOutputStream oos) throws IOException {
-		DataOutputViewStreamWrapper target = new DataOutputViewStreamWrapper(oos);
-		Map<SharedBufferEntry<K, V>, Integer> entryIDs = new HashMap<>();
-		int totalEdges = 0;
-		int entryCounter = 0;
-
-		oos.defaultWriteObject();
-
-		// number of pages
-		oos.writeInt(pages.size());
-
-		for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: pages.entrySet()) {
-			SharedBufferPage<K, V> page = pageEntry.getValue();
-
-			// key for the current page
-			oos.writeObject(page.getKey());
-			// number of page entries
-			oos.writeInt(page.entries.size());
-
-			for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
-				// serialize the sharedBufferEntry
-				SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue();
-
-				// assign id to the sharedBufferEntry for the future serialization of the previous
-				// relation
-				entryIDs.put(sharedBuffer, entryCounter++);
-
-				ValueTimeWrapper<V> valueTimeWrapper = sharedBuffer.getValueTime();
-
-				valueSerializer.serialize(valueTimeWrapper.value, target);
-				oos.writeLong(valueTimeWrapper.getTimestamp());
-				oos.writeInt(valueTimeWrapper.getCounter());
-
-				int edges = sharedBuffer.edges.size();
-				totalEdges += edges;
-
-				oos.writeInt(sharedBuffer.referenceCounter);
-			}
-		}
-
-		// write the edges between the shared buffer entries
-		oos.writeInt(totalEdges);
-
-		for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: pages.entrySet()) {
-			SharedBufferPage<K, V> page = pageEntry.getValue();
-
-			for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
-				SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue();
-
-				if (!entryIDs.containsKey(sharedBuffer)) {
-					throw new RuntimeException("Could not find id for entry: " + sharedBuffer);
-				} else {
-					int id = entryIDs.get(sharedBuffer);
-
-					for (SharedBufferEdge<K, V> edge: sharedBuffer.edges) {
-						// in order to serialize the previous relation we simply serialize the ids
-						// of the source and target SharedBufferEntry
-						if (edge.target != null) {
-							if (!entryIDs.containsKey(edge.getTarget())) {
-								throw new RuntimeException("Could not find id for entry: " + edge.getTarget());
-							} else {
-								int targetId = entryIDs.get(edge.getTarget());
-
-								oos.writeInt(id);
-								oos.writeInt(targetId);
-								oos.writeObject(edge.version);
-							}
-						} else {
-							oos.writeInt(id);
-							oos.writeInt(-1);
-							oos.writeObject(edge.version);
-						}
-					}
-				}
-			}
-		}
-	}
-
-	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
-		DataInputViewStreamWrapper source = new DataInputViewStreamWrapper(ois);
-		ArrayList<SharedBufferEntry<K, V>> entryList = new ArrayList<>();
-		ois.defaultReadObject();
-
-		this.pages = new HashMap<>();
-
-		int numberPages = ois.readInt();
-
-		for (int i = 0; i < numberPages; i++) {
-			// key of the page
-			@SuppressWarnings("unchecked")
-			K key = (K)ois.readObject();
-
-			SharedBufferPage<K, V> page = new SharedBufferPage<>(key);
-
-			pages.put(key, page);
-
-			int numberEntries = ois.readInt();
-
-			for (int j = 0; j < numberEntries; j++) {
-				// restore the SharedBufferEntries for the given page
-				V value = valueSerializer.deserialize(source);
-				long timestamp = ois.readLong();
-				int counter = ois.readInt();
-
-				ValueTimeWrapper<V> valueTimeWrapper = new ValueTimeWrapper<>(value, timestamp, counter);
-				SharedBufferEntry<K, V> sharedBufferEntry = new SharedBufferEntry<K, V>(valueTimeWrapper, page);
-
-				sharedBufferEntry.referenceCounter = ois.readInt();
-
-				page.entries.put(valueTimeWrapper, sharedBufferEntry);
-
-				entryList.add(sharedBufferEntry);
-			}
-		}
-
-		// read the edges of the shared buffer entries
-		int numberEdges = ois.readInt();
-
-		for (int j = 0; j < numberEdges; j++) {
-			int sourceIndex = ois.readInt();
-			int targetIndex = ois.readInt();
-
-			if (sourceIndex >= entryList.size() || sourceIndex < 0) {
-				throw new RuntimeException("Could not find source entry with index " + sourceIndex +
-					". This indicates a corrupted state.");
-			} else {
-				// We've already deserialized the shared buffer entry. Simply read its ID and
-				// retrieve the buffer entry from the list of entries
-				SharedBufferEntry<K, V> sourceEntry = entryList.get(sourceIndex);
-
-				final DeweyNumber version = (DeweyNumber) ois.readObject();
-				final SharedBufferEntry<K, V> target;
-
-				if (targetIndex >= 0) {
-					if (targetIndex >= entryList.size()) {
-						throw new RuntimeException("Could not find target entry with index " + targetIndex +
-							". This indicates a corrupted state.");
-					} else {
-						target = entryList.get(targetIndex);
-					}
-				} else {
-					target = null;
-				}
-
-				sourceEntry.edges.add(new SharedBufferEdge<K, V>(target, version));
-			}
-		}
-	}
-
 	private SharedBuffer(
 		TypeSerializer<V> valueSerializer,
 		Map<K, SharedBufferPage<K, V>> pages) {
@@ -523,7 +395,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 	public String toString() {
 		StringBuilder builder = new StringBuilder();
 
-		for(Map.Entry<K, SharedBufferPage<K, V>> entry :pages.entrySet()){
+		for(Map.Entry<K, SharedBufferPage<K, V>> entry: pages.entrySet()){
 			builder.append("Key: ").append(entry.getKey()).append("\n");
 			builder.append("Value: ").append(entry.getValue()).append("\n");
 		}
@@ -537,7 +409,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 			@SuppressWarnings("unchecked")
 			SharedBuffer<K, V> other = (SharedBuffer<K, V>) obj;
 
-			return pages.equals(other.pages) && valueSerializer.equals(other.valueSerializer);
+			return pages.equals(other.pages) && getValueSerializer().equals(other.getValueSerializer());
 		} else {
 			return false;
 		}
@@ -545,7 +417,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(pages, valueSerializer);
+		return Objects.hash(pages, getValueSerializer());
 	}
 
 	/**
@@ -931,4 +803,424 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 			return "ExtractionState(" + entry + ", " + version + ", [" +  StringUtils.join(path, ", ") + "])";
 		}
 	}
+
+	//////////////				New Serialization				////////////////////
+
+	/**
+	 * The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state.
+	 */
+	public static final class SharedBufferSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
+
+		private static final int VERSION = 1;
+
+		/** This empty constructor is required for deserializing the configuration. */
+		public SharedBufferSerializerConfigSnapshot() {}
+
+		public SharedBufferSerializerConfigSnapshot(
+				TypeSerializerConfigSnapshot keySerializerConfigSnapshot,
+				TypeSerializerConfigSnapshot valueSerializerConfigSnapshot,
+				TypeSerializerConfigSnapshot versionSerializerConfigSnapshot) {
+
+			super(keySerializerConfigSnapshot, valueSerializerConfigSnapshot, versionSerializerConfigSnapshot);
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+	}
+
+	/**
+	 * A {@link TypeSerializer} for the {@link SharedBuffer}.
+	 */
+	public static class SharedBufferSerializer<K extends Serializable, V> extends TypeSerializer<SharedBuffer<K, V>> {
+
+		private static final long serialVersionUID = -3254176794680331560L;
+
+		private final TypeSerializer<K> keySerializer;
+		private final TypeSerializer<V> valueSerializer;
+		private final TypeSerializer<DeweyNumber> versionSerializer;
+
+		public SharedBufferSerializer(
+				TypeSerializer<K> keySerializer,
+				TypeSerializer<V> valueSerializer) {
+			this(keySerializer, valueSerializer, new DeweyNumber.DeweyNumberSerializer());
+		}
+
+		public SharedBufferSerializer(
+				TypeSerializer<K> keySerializer,
+				TypeSerializer<V> valueSerializer,
+				TypeSerializer<DeweyNumber> versionSerializer) {
+
+			this.keySerializer = keySerializer;
+			this.valueSerializer = valueSerializer;
+			this.versionSerializer = versionSerializer;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<SharedBuffer<K, V>> duplicate() {
+			return new SharedBufferSerializer<>(keySerializer, valueSerializer);
+		}
+
+		@Override
+		public SharedBuffer<K, V> createInstance() {
+			return new SharedBuffer<>(new NonDuplicatingTypeSerializer<V>(valueSerializer));
+		}
+
+		@Override
+		public SharedBuffer<K, V> copy(SharedBuffer from) {
+			try {
+				ByteArrayOutputStream baos = new ByteArrayOutputStream();
+				ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+				serialize(from, new DataOutputViewStreamWrapper(oos));
+
+				oos.close();
+				baos.close();
+
+				byte[] data = baos.toByteArray();
+
+				ByteArrayInputStream bais = new ByteArrayInputStream(data);
+				ObjectInputStream ois = new ObjectInputStream(bais);
+
+				@SuppressWarnings("unchecked")
+				SharedBuffer<K, V> copy = deserialize(new DataInputViewStreamWrapper(ois));
+				ois.close();
+				bais.close();
+
+				return copy;
+			} catch (IOException e) {
+				throw new RuntimeException("Could not copy SharredBuffer.", e);
+			}
+		}
+
+		@Override
+		public SharedBuffer<K, V> copy(SharedBuffer from, SharedBuffer reuse) {
+			return copy(from);
+		}
+
+		@Override
+		public int getLength() {
+			return -1;
+		}
+
+		@Override
+		public void serialize(SharedBuffer record, DataOutputView target) throws IOException {
+			Map<K, SharedBufferPage<K, V>> pages = record.pages;
+			Map<SharedBufferEntry<K, V>, Integer> entryIDs = new HashMap<>();
+
+			int totalEdges = 0;
+			int entryCounter = 0;
+
+			// number of pages
+			target.writeInt(pages.size());
+
+			for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: pages.entrySet()) {
+				SharedBufferPage<K, V> page = pageEntry.getValue();
+
+				// key for the current page
+				keySerializer.serialize(page.getKey(), target);
+				
+				// number of page entries
+				target.writeInt(page.entries.size());
+
+				for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
+					SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue();
+
+					// assign id to the sharedBufferEntry for the future
+					// serialization of the previous relation
+					entryIDs.put(sharedBuffer, entryCounter++);
+
+					ValueTimeWrapper<V> valueTimeWrapper = sharedBuffer.getValueTime();
+
+					valueSerializer.serialize(valueTimeWrapper.value, target);
+					target.writeLong(valueTimeWrapper.getTimestamp());
+					target.writeInt(valueTimeWrapper.getCounter());
+
+					int edges = sharedBuffer.edges.size();
+					totalEdges += edges;
+
+					target.writeInt(sharedBuffer.referenceCounter);
+				}
+			}
+
+			// write the edges between the shared buffer entries
+			target.writeInt(totalEdges);
+
+			for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: pages.entrySet()) {
+				SharedBufferPage<K, V> page = pageEntry.getValue();
+
+				for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
+					SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue();
+
+					Integer id = entryIDs.get(sharedBuffer);
+					Preconditions.checkState(id != null, "Could not find id for entry: " + sharedBuffer);
+
+					for (SharedBufferEdge<K, V> edge: sharedBuffer.edges) {
+						// in order to serialize the previous relation we simply serialize the ids
+						// of the source and target SharedBufferEntry
+						if (edge.target != null) {
+							Integer targetId = entryIDs.get(edge.getTarget());
+							Preconditions.checkState(targetId != null,
+									"Could not find id for entry: " + edge.getTarget());
+
+							target.writeInt(id);
+							target.writeInt(targetId);
+							versionSerializer.serialize(edge.version, target);
+						} else {
+							target.writeInt(id);
+							target.writeInt(-1);
+							versionSerializer.serialize(edge.version, target);
+						}
+					}
+				}
+			}
+		}
+
+		@Override
+		public SharedBuffer deserialize(DataInputView source) throws IOException {
+			List<SharedBufferEntry<K, V>> entryList = new ArrayList<>();
+			Map<K, SharedBufferPage<K, V>> pages = new HashMap<>();
+
+			int totalPages = source.readInt();
+
+			for (int i = 0; i < totalPages; i++) {
+				// key of the page
+				@SuppressWarnings("unchecked")
+				K key = keySerializer.deserialize(source);
+
+				SharedBufferPage<K, V> page = new SharedBufferPage<>(key);
+
+				pages.put(key, page);
+
+				int numberEntries = source.readInt();
+
+				for (int j = 0; j < numberEntries; j++) {
+					// restore the SharedBufferEntries for the given page
+					V value = valueSerializer.deserialize(source);
+					long timestamp = source.readLong();
+					int counter = source.readInt();
+
+					ValueTimeWrapper<V> valueTimeWrapper = new ValueTimeWrapper<>(value, timestamp, counter);
+					SharedBufferEntry<K, V> sharedBufferEntry = new SharedBufferEntry<K, V>(valueTimeWrapper, page);
+
+					sharedBufferEntry.referenceCounter = source.readInt();
+
+					page.entries.put(valueTimeWrapper, sharedBufferEntry);
+
+					entryList.add(sharedBufferEntry);
+				}
+			}
+
+			// read the edges of the shared buffer entries
+			int totalEdges = source.readInt();
+
+			for (int j = 0; j < totalEdges; j++) {
+				int sourceIndex = source.readInt();
+				Preconditions.checkState(sourceIndex < entryList.size() && sourceIndex >= 0,
+						"Could not find source entry with index " + sourceIndex + 	". This indicates a corrupted state.");
+
+				int targetIndex = source.readInt();
+				Preconditions.checkState(targetIndex < entryList.size(),
+						"Could not find target entry with index " + sourceIndex + 	". This indicates a corrupted state.");
+
+				DeweyNumber version = versionSerializer.deserialize(source);
+
+				// We've already deserialized the shared buffer entry. Simply read its ID and
+				// retrieve the buffer entry from the list of entries
+				SharedBufferEntry<K, V> sourceEntry = entryList.get(sourceIndex);
+				SharedBufferEntry<K, V> targetEntry = targetIndex < 0 ? null : entryList.get(targetIndex);
+
+				sourceEntry.edges.add(new SharedBufferEdge<>(targetEntry, version));
+			}
+			// here we put the old NonDuplicating serializer because this needs to create a copy
+			// of the buffer, as created by the NFA. There, for compatibility reasons, we have left
+			// the old serializer.
+			return new SharedBuffer(new NonDuplicatingTypeSerializer(valueSerializer), pages);
+		}
+
+		@Override
+		public SharedBuffer deserialize(SharedBuffer reuse, DataInputView source) throws IOException {
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			int numberPages = source.readInt();
+			target.writeInt(numberPages);
+
+			for (int i = 0; i < numberPages; i++) {
+				// key of the page
+				@SuppressWarnings("unchecked")
+				K key = keySerializer.deserialize(source);
+				keySerializer.serialize(key, target);
+
+				int numberEntries = source.readInt();
+
+				for (int j = 0; j < numberEntries; j++) {
+					// restore the SharedBufferEntries for the given page
+					V value = valueSerializer.deserialize(source);
+					valueSerializer.serialize(value, target);
+
+					long timestamp = source.readLong();
+					target.writeLong(timestamp);
+
+					int counter = source.readInt();
+					target.writeInt(counter);
+
+					int referenceCounter = source.readInt();
+					target.writeInt(referenceCounter);
+				}
+			}
+
+			// read the edges of the shared buffer entries
+			int numberEdges = source.readInt();
+			target.writeInt(numberEdges);
+
+			for (int j = 0; j < numberEdges; j++) {
+				int sourceIndex = source.readInt();
+				int targetIndex = source.readInt();
+
+				target.writeInt(sourceIndex);
+				target.writeInt(targetIndex);
+
+				DeweyNumber version = versionSerializer.deserialize(source);
+				versionSerializer.serialize(version, target);
+			}
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj == this ||
+					(obj != null && obj.getClass().equals(getClass()) &&
+							keySerializer.equals(((SharedBufferSerializer<?, ?>) obj).keySerializer) &&
+							valueSerializer.equals(((SharedBufferSerializer<?, ?>) obj).valueSerializer) &&
+							versionSerializer.equals(((SharedBufferSerializer<?, ?>) obj).versionSerializer));
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return true;
+		}
+
+		@Override
+		public int hashCode() {
+			return 37 * keySerializer.hashCode() + valueSerializer.hashCode();
+		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			return new SharedBufferSerializerConfigSnapshot(
+					keySerializer.snapshotConfiguration(),
+					valueSerializer.snapshotConfiguration(),
+					versionSerializer.snapshotConfiguration()
+			);
+		}
+
+		@Override
+		public CompatibilityResult<SharedBuffer<K, V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			if (configSnapshot instanceof SharedBufferSerializerConfigSnapshot) {
+				TypeSerializerConfigSnapshot[] serializerConfigSnapshots =
+						((SharedBufferSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots();
+
+				CompatibilityResult<K> keyCompatResult = keySerializer.ensureCompatibility(serializerConfigSnapshots[0]);
+				CompatibilityResult<V> valueCompatResult = valueSerializer.ensureCompatibility(serializerConfigSnapshots[1]);
+				CompatibilityResult<DeweyNumber> versionCompatResult = versionSerializer.ensureCompatibility(serializerConfigSnapshots[2]);
+
+				if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration() && !versionCompatResult.isRequiresMigration()) {
+					return CompatibilityResult.compatible();
+				} else {
+					if (keyCompatResult.getConvertDeserializer() != null
+							&& valueCompatResult.getConvertDeserializer() != null
+							&& versionCompatResult.getConvertDeserializer() != null) {
+						return CompatibilityResult.requiresMigration(
+								new SharedBufferSerializer<>(
+										new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()),
+										new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer()),
+										new TypeDeserializerAdapter<>(versionCompatResult.getConvertDeserializer())
+								));
+					}
+				}
+			}
+
+			return CompatibilityResult.requiresMigration(null);
+		}
+	}
+
+	//////////////////			Java Serialization methods for backwards compatibility			//////////////////
+
+	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
+		DataInputViewStreamWrapper source = new DataInputViewStreamWrapper(ois);
+		ArrayList<SharedBufferEntry<K, V>> entryList = new ArrayList<>();
+		ois.defaultReadObject();
+
+		this.pages = new HashMap<>();
+
+		int numberPages = ois.readInt();
+
+		for (int i = 0; i < numberPages; i++) {
+			// key of the page
+			@SuppressWarnings("unchecked")
+			K key = (K)ois.readObject();
+
+			SharedBufferPage<K, V> page = new SharedBufferPage<>(key);
+
+			pages.put(key, page);
+
+			int numberEntries = ois.readInt();
+
+			for (int j = 0; j < numberEntries; j++) {
+				// restore the SharedBufferEntries for the given page
+				V value = valueSerializer.deserialize(source);
+				long timestamp = ois.readLong();
+
+				ValueTimeWrapper<V> valueTimeWrapper = new ValueTimeWrapper<>(value, timestamp, 0);
+				SharedBufferEntry<K, V> sharedBufferEntry = new SharedBufferEntry<K, V>(valueTimeWrapper, page);
+
+				sharedBufferEntry.referenceCounter = ois.readInt();
+
+				page.entries.put(valueTimeWrapper, sharedBufferEntry);
+
+				entryList.add(sharedBufferEntry);
+			}
+		}
+
+		// read the edges of the shared buffer entries
+		int numberEdges = ois.readInt();
+
+		for (int j = 0; j < numberEdges; j++) {
+			int sourceIndex = ois.readInt();
+			int targetIndex = ois.readInt();
+
+			if (sourceIndex >= entryList.size() || sourceIndex < 0) {
+				throw new RuntimeException("Could not find source entry with index " + sourceIndex +
+						". This indicates a corrupted state.");
+			} else {
+				// We've already deserialized the shared buffer entry. Simply read its ID and
+				// retrieve the buffer entry from the list of entries
+				SharedBufferEntry<K, V> sourceEntry = entryList.get(sourceIndex);
+
+				final DeweyNumber version = (DeweyNumber) ois.readObject();
+				final SharedBufferEntry<K, V> target;
+
+				if (targetIndex >= 0) {
+					if (targetIndex >= entryList.size()) {
+						throw new RuntimeException("Could not find target entry with index " + targetIndex +
+								". This indicates a corrupted state.");
+					} else {
+						target = entryList.get(targetIndex);
+					}
+				} else {
+					target = null;
+				}
+
+				sourceEntry.edges.add(new SharedBufferEdge<K, V>(target, version));
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 3d11538..14395b1 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -146,6 +146,8 @@ public class State<T> implements Serializable {
 		Stop
 	}
 
+	////////////////			Backwards Compatibility			////////////////////
+
 	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
 		ois.defaultReadObject();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 39c18b9..1b31485 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -44,6 +44,7 @@ import org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.NotCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a
@@ -53,6 +54,8 @@ public class NFACompiler {
 
 	protected static final String ENDING_STATE_NAME = "$endState$";
 
+	protected static final String STATE_NAME_DELIM = ":";
+
 	/**
 	 * Compiles the given pattern into a {@link NFA}.
 	 *
@@ -71,6 +74,11 @@ public class NFACompiler {
 		return factory.createNFA();
 	}
 
+	public static String getOriginalStateNameFromInternal(String internalName) {
+		Preconditions.checkNotNull(internalName);
+		return internalName.split(STATE_NAME_DELIM)[0];
+	}
+
 	/**
 	 * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create
 	 * multiple NFAs.
@@ -178,10 +186,7 @@ public class NFACompiler {
 		 * @return dummy Final state
 		 */
 		private State<T> createEndingState() {
-			checkPatternNameUniqueness(ENDING_STATE_NAME);
-			State<T> endState = new State<>(ENDING_STATE_NAME, State.StateType.Final);
-			states.add(endState);
-
+			State<T> endState = createState(ENDING_STATE_NAME, State.StateType.Final);
 			windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() : 0L;
 			return endState;
 		}
@@ -199,7 +204,8 @@ public class NFACompiler {
 				if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
 					//skip notFollow patterns, they are converted into edge conditions
 				} else if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_NEXT) {
-					final State<T> notNext = createNormalState();
+					checkPatternNameUniqueness(currentPattern.getName());
+					final State<T> notNext = createState(currentPattern.getName(), State.StateType.Normal);
 					final IterativeCondition<T> notCondition = (IterativeCondition<T>) currentPattern.getCondition();
 					final State<T> stopState = createStopState(notCondition, currentPattern.getName());
 
@@ -212,6 +218,7 @@ public class NFACompiler {
 					notNext.addProceed(stopState, notCondition);
 					lastSink = notNext;
 				} else {
+					checkPatternNameUniqueness(currentPattern.getName());
 					lastSink = convertPattern(lastSink);
 				}
 
@@ -236,6 +243,7 @@ public class NFACompiler {
 		 */
 		@SuppressWarnings("unchecked")
 		private State<T> createStartState(State<T> sinkState) {
+			checkPatternNameUniqueness(currentPattern.getName());
 			final State<T> beginningState = convertPattern(sinkState);
 			beginningState.makeStart();
 			return beginningState;
@@ -243,7 +251,6 @@ public class NFACompiler {
 
 		private State<T> convertPattern(final State<T> sinkState) {
 			final State<T> lastSink;
-			checkPatternNameUniqueness(currentPattern.getName());
 
 			final Quantifier quantifier = currentPattern.getQuantifier();
 			if (quantifier.hasProperty(Quantifier.QuantifierProperty.LOOPING)) {
@@ -273,18 +280,42 @@ public class NFACompiler {
 		 *
 		 * @return the created state
 		 */
-		private State<T> createNormalState() {
-			final State<T> state = new State<>(currentPattern.getName(), State.StateType.Normal);
+		private State<T> createState(String name, State.StateType stateType) {
+			String stateName = getUniqueInternalStateName(name);
+			usedNames.add(stateName);
+			State<T> state = new State<>(stateName, stateType);
 			states.add(state);
 			return state;
 		}
 
+		/**
+		 * Used to give a unique name to states created
+		 * during the translation process.
+		 *
+		 * @param baseName The base of the name.
+		 */
+		private String getUniqueInternalStateName(String baseName) {
+			int counter = 0;
+			String candidate = baseName;
+			while (usedNames.contains(candidate)) {
+				candidate = baseName + STATE_NAME_DELIM + counter++;
+			}
+			return candidate;
+		}
+
+		private void checkPatternNameUniqueness(String patternName) {
+			if (usedNames.contains(patternName)) {
+				throw new MalformedPatternException(
+						"Duplicate pattern name: " + patternName + ". " +
+								"Pattern names must be unique.");
+			}
+		}
+
 		private State<T> createStopState(final IterativeCondition<T> notCondition, final String name) {
 			// We should not duplicate the notStates. All states from which we can stop should point to the same one.
 			State<T> stopState = stopStates.get(name);
 			if (stopState == null) {
-				stopState = new State<>(name, State.StateType.Stop);
-				states.add(stopState);
+				stopState = createState(name, State.StateType.Stop);
 				stopState.addTake(notCondition);
 				stopStates.put(name, stopState);
 			}
@@ -313,8 +344,7 @@ public class NFACompiler {
 				return sinkState;
 			}
 
-			final State<T> copyOfSink = new State<>(sinkState.getName(), sinkState.getStateType());
-			states.add(copyOfSink);
+			final State<T> copyOfSink = createState(sinkState.getName(), sinkState.getStateType());
 
 			for (StateTransition<T> tStateTransition : sinkState.getStateTransitions()) {
 
@@ -364,15 +394,6 @@ public class NFACompiler {
 			}
 		}
 
-		private void checkPatternNameUniqueness(String patternName) {
-			if (usedNames.contains(currentPattern.getName())) {
-				throw new MalformedPatternException(
-					"Duplicate pattern name: " + patternName + ". " +
-					"Pattern names must be unique.");
-			}
-			usedNames.add(patternName);
-		}
-
 		/**
 		 * Creates a "complex" state consisting of given number of states with
 		 * same {@link IterativeCondition}
@@ -396,12 +417,12 @@ public class NFACompiler {
 				return createSingletonState(lastSink, ignoreCondition, false);
 			}
 
-			final State<T> singletonState = createNormalState();
+			final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal);
 			singletonState.addTake(lastSink, currentCondition);
 			singletonState.addProceed(sinkState, BooleanConditions.<T>trueFunction());
 
 			if (ignoreCondition != null) {
-				State<T> ignoreState = createNormalState();
+				State<T> ignoreState = createState(currentPattern.getName(), State.StateType.Normal);
 				ignoreState.addTake(lastSink, currentCondition);
 				ignoreState.addIgnore(ignoreCondition);
 				singletonState.addIgnore(ignoreState, ignoreCondition);
@@ -440,7 +461,7 @@ public class NFACompiler {
 			final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
 			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
 
-			final State<T> singletonState = createNormalState();
+			final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal);
 			// if event is accepted then all notPatterns previous to the optional states are no longer valid
 			final State<T> sink = copyWithoutTransitiveNots(sinkState);
 			singletonState.addTake(sink, currentCondition);
@@ -453,7 +474,7 @@ public class NFACompiler {
 			if (ignoreCondition != null) {
 				final State<T> ignoreState;
 				if (isOptional) {
-					ignoreState = createNormalState();
+					ignoreState = createState(currentPattern.getName(), State.StateType.Normal);
 					ignoreState.addTake(sink, currentCondition);
 					ignoreState.addIgnore(ignoreCondition);
 					addStopStates(ignoreState);
@@ -479,14 +500,14 @@ public class NFACompiler {
 			final IterativeCondition<T> ignoreCondition = getInnerIgnoreCondition(currentPattern);
 			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
 
-			final State<T> loopingState = createNormalState();
+			final State<T> loopingState = createState(currentPattern.getName(), State.StateType.Normal);
 			loopingState.addProceed(sinkState, trueFunction);
 			loopingState.addTake(currentCondition);
 
 			addStopStateToLooping(loopingState);
 
 			if (ignoreCondition != null) {
-				final State<T> ignoreState = createNormalState();
+				final State<T> ignoreState = createState(currentPattern.getName(), State.StateType.Normal);
 				ignoreState.addTake(loopingState, currentCondition);
 				ignoreState.addIgnore(ignoreCondition);
 				loopingState.addIgnore(ignoreState, ignoreCondition);
@@ -507,7 +528,7 @@ public class NFACompiler {
 		private State<T> createInitMandatoryStateOfOneOrMore(final State<T> sinkState) {
 			final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
 
-			final State<T> firstState = createNormalState();
+			final State<T> firstState = createState(currentPattern.getName(), State.StateType.Normal);
 			firstState.addTake(sinkState, currentCondition);
 
 			final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
@@ -528,13 +549,13 @@ public class NFACompiler {
 		private State<T> createInitOptionalStateOfZeroOrMore(final State<T> loopingState, final State<T> lastSink) {
 			final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
 
-			final State<T> firstState = createNormalState();
+			final State<T> firstState = createState(currentPattern.getName(), State.StateType.Normal);
 			firstState.addProceed(lastSink, BooleanConditions.<T>trueFunction());
 			firstState.addTake(loopingState, currentCondition);
 
 			final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
 			if (ignoreFunction != null) {
-				final State<T> firstStateWithoutProceed = createNormalState();
+				final State<T> firstStateWithoutProceed = createState(currentPattern.getName(), State.StateType.Normal);
 				firstState.addIgnore(firstStateWithoutProceed, ignoreFunction);
 				firstStateWithoutProceed.addIgnore(ignoreFunction);
 				firstStateWithoutProceed.addTake(loopingState, currentCondition);

http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index bac21b3..2ed7245 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -82,7 +82,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 
 	///////////////			State			//////////////
 
-	private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorState";
+	private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorStateName";
 	private static final String PRIORITY_QUEUE_STATE_NAME = "priorityQueueStateName";
 
 	private transient ValueState<NFA<IN>> nfaOperatorState;
@@ -127,8 +127,8 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 		if (nfaOperatorState == null) {
 			nfaOperatorState = getRuntimeContext().getState(
 				new ValueStateDescriptor<>(
-					NFA_OPERATOR_STATE_NAME,
-					new NFA.Serializer<IN>()));
+						NFA_OPERATOR_STATE_NAME,
+						new NFA.NFASerializer<>(inputSerializer)));
 		}
 
 		@SuppressWarnings("unchecked,rawtypes")
@@ -311,12 +311,20 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 				VoidNamespaceSerializer.INSTANCE,
 				this);
 
+		// this is with the old serializer so that we can read the state.
+		ValueState<NFA<IN>> oldNfaOperatorState = getRuntimeContext().getState(
+				new ValueStateDescriptor<>("nfaOperatorState", new NFA.Serializer<IN>()));
+
 		if (migratingFromOldKeyedOperator) {
 			int numberEntries = inputView.readInt();
-			for (int i = 0; i <numberEntries; i++) {
+			for (int i = 0; i < numberEntries; i++) {
 				KEY key = keySerializer.deserialize(inputView);
 				setCurrentKey(key);
 				saveRegisterWatermarkTimer();
+
+				NFA<IN> nfa = oldNfaOperatorState.value();
+				oldNfaOperatorState.clear();
+				nfaOperatorState.update(nfa);
 			}
 		} else {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 11d193a..2619764 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -20,16 +20,19 @@ package org.apache.flink.cep.nfa;
 
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.BooleanConditions;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -185,35 +188,136 @@ public class NFATest extends TestLogger {
 
 	@Test
 	public void testNFASerialization() throws IOException, ClassNotFoundException {
-		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0, false);
-
-		State<Event> startingState = new State<>("", State.StateType.Start);
-		State<Event> startState = new State<>("start", State.StateType.Normal);
-		State<Event> endState = new State<>("end", State.StateType.Final);
-
-
-		startingState.addTake(
-			new NameFilter("start"));
-		startState.addTake(
-			new NameFilter("end"));
-		startState.addIgnore(null);
-
-		nfa.addState(startingState);
-		nfa.addState(startState);
-		nfa.addState(endState);
-
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		ObjectOutputStream oos = new ObjectOutputStream(baos);
-
-		oos.writeObject(nfa);
-
-		ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-		ObjectInputStream ois = new ObjectInputStream(bais);
-
-		@SuppressWarnings("unchecked")
-		NFA<Event> copy = (NFA<Event>) ois.readObject();
-
-		assertEquals(nfa, copy);
+		Pattern<Event, ?> pattern1 = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 1858562682635302605L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 8061969839441121955L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).oneOrMore().optional().allowCombinations().followedByAny("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 8061969839441121955L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		Pattern<Event, ?> pattern2 = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 1858562682635302605L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("not").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -6085237016591726715L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 8061969839441121955L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).oneOrMore().optional().allowCombinations().followedByAny("end").where(new IterativeCondition<Event>() {
+			private static final long serialVersionUID = 8061969839441121955L;
+
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+				double sum = 0.0;
+				for (Event e : ctx.getEventsForPattern("middle")) {
+					sum += e.getPrice();
+				}
+				return sum > 5.0;
+			}
+		});
+
+		Pattern<Event, ?> pattern3 = Pattern.<Event>begin("start")
+				.notFollowedBy("not").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -6085237016591726715L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 8061969839441121955L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).oneOrMore().allowCombinations().followedByAny("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 8061969839441121955L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		List<Pattern<Event, ?>> patterns = new ArrayList<>();
+		patterns.add(pattern1);
+		patterns.add(pattern2);
+		patterns.add(pattern3);
+
+		for (Pattern<Event, ?> p: patterns) {
+			NFACompiler.NFAFactory<Event> nfaFactory = NFACompiler.compileFactory(p, Event.createTypeSerializer(), false);
+			NFA<Event> nfa = nfaFactory.createNFA();
+
+			Event a = new Event(40, "a", 1.0);
+			Event b = new Event(41, "b", 2.0);
+			Event c = new Event(42, "c", 3.0);
+			Event b1 = new Event(41, "b", 3.0);
+			Event b2 = new Event(41, "b", 4.0);
+			Event b3 = new Event(41, "b", 5.0);
+			Event d = new Event(43, "d", 4.0);
+
+			nfa.process(a, 1);
+			nfa.process(b, 2);
+			nfa.process(c, 3);
+			nfa.process(b1, 4);
+			nfa.process(b2, 5);
+			nfa.process(b3, 6);
+			nfa.process(d, 7);
+			nfa.process(a, 8);
+
+			NFA.NFASerializer<Event> serializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
+
+			//serialize
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			serializer.serialize(nfa, new DataOutputViewStreamWrapper(baos));
+			baos.close();
+
+			// copy
+			NFA.NFASerializer<Event> copySerializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
+			ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
+			ByteArrayOutputStream out = new ByteArrayOutputStream();
+			copySerializer.copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
+			in.close();
+			out.close();
+
+			// deserialize
+			ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
+			NFA.NFASerializer<Event> deserializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
+			NFA<Event> copy = deserializer.deserialize(new DataInputViewStreamWrapper(bais));
+			bais.close();
+
+			assertEquals(nfa, copy);
+		}
 	}
 
 	private NFA<Event> createStartEndNFA(long windowLength) {
@@ -251,20 +355,4 @@ public class NFATest extends TestLogger {
 
 		return nfa;
 	}
-
-	private static class NameFilter extends SimpleCondition<Event> {
-
-		private static final long serialVersionUID = 7472112494752423802L;
-
-		private final String name;
-
-		public NameFilter(final String name) {
-			this.name = name;
-		}
-
-		@Override
-		public boolean filter(Event value) throws Exception {
-			return value.getName().equals(name);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index ee94b6f..bd828b6 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -20,7 +20,10 @@ package org.apache.flink.cep.nfa;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.cep.Event;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -130,15 +133,14 @@ public class SharedBufferTest extends TestLogger {
 		sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, 5, DeweyNumber.fromString("1.1"));
 		sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, 6, DeweyNumber.fromString("1.1.0"));
 
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		ObjectOutputStream oos = new ObjectOutputStream(baos);
+		SharedBuffer.SharedBufferSerializer serializer = new SharedBuffer.SharedBufferSerializer(
+				StringSerializer.INSTANCE, Event.createTypeSerializer());
 
-		oos.writeObject(sharedBuffer);
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		serializer.serialize(sharedBuffer, new DataOutputViewStreamWrapper(baos));
 
 		ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-		ObjectInputStream ois = new ObjectInputStream(bais);
-
-		SharedBuffer<String, Event> copy = (SharedBuffer<String, Event>)ois.readObject();
+		SharedBuffer<String, Event> copy = serializer.deserialize(new DataInputViewStreamWrapper(bais));
 
 		assertEquals(sharedBuffer, copy);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
index d9efb1b..fb05901 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -103,7 +103,6 @@ public class CEPFrom12MigrationTest {
 	}
 
 	@Test
-	@Ignore
 	public void testRestoreAfterBranchingPattern() throws Exception {
 
 		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
@@ -171,6 +170,54 @@ public class CEPFrom12MigrationTest {
 		assertEquals(middleEvent2, patternMap2.get("middle").get(0));
 		assertEquals(endEvent, patternMap2.get("end").get(0));
 
+		// and now go for a checkpoint with the new serializers
+
+		final Event startEvent1 = new Event(42, "start", 2.0);
+		final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0);
+		final Event endEvent1 = new Event(42, "end", 2.0);
+
+		harness.processElement(new StreamRecord<Event>(startEvent1, 21));
+		harness.processElement(new StreamRecord<Event>(middleEvent3, 23));
+
+		// simulate snapshot/restore with some elements in internal sorting queue
+		OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+		harness.close();
+
+		harness = new KeyedOneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+						Event.createTypeSerializer(),
+						false,
+						IntSerializer.INSTANCE,
+						new NFAFactory(),
+						true),
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
+
+		harness.setup();
+		harness.initializeState(snapshot);
+		harness.open();
+
+		harness.processElement(new StreamRecord<>(endEvent1, 25));
+
+		harness.processWatermark(new Watermark(50));
+
+		result = harness.getOutput();
+
+		// watermark and the result
+		assertEquals(2, result.size());
+
+		Object resultObject3 = result.poll();
+		assertTrue(resultObject3 instanceof StreamRecord);
+		StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
+		assertTrue(resultRecord3.getValue() instanceof Map);
+
+		@SuppressWarnings("unchecked")
+		Map<String, List<Event>> patternMap3 = (Map<String, List<Event>>) resultRecord3.getValue();
+
+		assertEquals(startEvent1, patternMap3.get("start").get(0));
+		assertEquals(middleEvent3, patternMap3.get("middle").get(0));
+		assertEquals(endEvent1, patternMap3.get("end").get(0));
+
 		harness.close();
 	}
 
@@ -220,7 +267,6 @@ public class CEPFrom12MigrationTest {
 	}
 
 	@Test
-	@Ignore
 	public void testRestoreStartingNewPatternAfterMigration() throws Exception {
 
 		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
@@ -302,6 +348,54 @@ public class CEPFrom12MigrationTest {
 		assertEquals(middleEvent2, patternMap3.get("middle").get(0));
 		assertEquals(endEvent, patternMap3.get("end").get(0));
 
+		// and now go for a checkpoint with the new serializers
+
+		final Event startEvent3 = new Event(42, "start", 2.0);
+		final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0);
+		final Event endEvent1 = new Event(42, "end", 2.0);
+
+		harness.processElement(new StreamRecord<Event>(startEvent3, 21));
+		harness.processElement(new StreamRecord<Event>(middleEvent3, 23));
+
+		// simulate snapshot/restore with some elements in internal sorting queue
+		OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+		harness.close();
+
+		harness = new KeyedOneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+						Event.createTypeSerializer(),
+						false,
+						IntSerializer.INSTANCE,
+						new NFAFactory(),
+						true),
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
+
+		harness.setup();
+		harness.initializeState(snapshot);
+		harness.open();
+
+		harness.processElement(new StreamRecord<>(endEvent1, 25));
+
+		harness.processWatermark(new Watermark(50));
+
+		result = harness.getOutput();
+
+		// watermark and the result
+		assertEquals(2, result.size());
+
+		Object resultObject4 = result.poll();
+		assertTrue(resultObject4 instanceof StreamRecord);
+		StreamRecord<?> resultRecord4 = (StreamRecord<?>) resultObject4;
+		assertTrue(resultRecord4.getValue() instanceof Map);
+
+		@SuppressWarnings("unchecked")
+		Map<String, List<Event>> patternMap4 = (Map<String, List<Event>>) resultRecord4.getValue();
+
+		assertEquals(startEvent3, patternMap4.get("start").get(0));
+		assertEquals(middleEvent3, patternMap4.get("middle").get(0));
+		assertEquals(endEvent1, patternMap4.get("end").get(0));
+
 		harness.close();
 	}
 
@@ -347,7 +441,6 @@ public class CEPFrom12MigrationTest {
 
 
 	@Test
-	@Ignore
 	public void testSinglePatternAfterMigration() throws Exception {
 
 		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {


Mime
View raw message