flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [10/11] flink git commit: [FLINK-7461] Remove Backwards compatibility with <= Flink 1.1
Date Thu, 24 Aug 2017 18:22:41 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 11f14b9..78ac39c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -43,11 +43,6 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.flink.shaded.guava18.com.google.common.base.Predicate;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
-
-import javax.annotation.Nullable;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -55,7 +50,6 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OptionalDataException;
 import java.io.Serializable;
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -716,9 +710,7 @@ public class NFA<T> implements Serializable {
 		return result;
 	}
 
-	//////////////////////			Fault-Tolerance / Migration			//////////////////////
-
-	private static final String BEGINNING_STATE_NAME = "$beginningState$";
+	//////////////////////			Fault-Tolerance			//////////////////////
 
 	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
 		ois.defaultReadObject();
@@ -729,103 +721,15 @@ public class NFA<T> implements Serializable {
 
 		final List<ComputationState<T>> readComputationStates = new ArrayList<>(numberComputationStates);
 
-		boolean afterMigration = false;
 		for (int i = 0; i < numberComputationStates; i++) {
 			ComputationState<T> computationState = readComputationState(ois);
-			if (computationState.getState().getName().equals(BEGINNING_STATE_NAME)) {
-				afterMigration = true;
-			}
-
 			readComputationStates.add(computationState);
 		}
 
-		if (afterMigration && !readComputationStates.isEmpty()) {
-			try {
-				//Backwards compatibility
-				this.computationStates.addAll(migrateNFA(readComputationStates));
-				final Field newSharedBufferField = NFA.class.getDeclaredField("eventSharedBuffer");
-				final Field sharedBufferField = NFA.class.getDeclaredField("sharedBuffer");
-				sharedBufferField.setAccessible(true);
-				newSharedBufferField.setAccessible(true);
-				newSharedBufferField.set(this, SharedBuffer.migrateSharedBuffer(this.sharedBuffer));
-				sharedBufferField.set(this, null);
-				sharedBufferField.setAccessible(false);
-				newSharedBufferField.setAccessible(false);
-			} catch (Exception e) {
-				throw new IllegalStateException("Could not migrate from earlier version", e);
-			}
-		} else {
-			this.computationStates.addAll(readComputationStates);
-		}
-
+		this.computationStates.addAll(readComputationStates);
 		nonDuplicatingTypeSerializer.clearReferences();
 	}
 
-	/**
-	 * Needed for backward compatibility. First migrates the {@link State} graph see {@link NFACompiler#migrateGraph(State)}.
-	 * Than recreates the {@link ComputationState}s with the new {@link State} graph.
-	 * @param readStates computation states read from snapshot
-	 * @return collection of migrated computation states
-	 */
-	private Collection<ComputationState<T>> migrateNFA(Collection<ComputationState<T>> readStates) {
-		final ArrayList<ComputationState<T>> computationStates = new ArrayList<>();
-
-		final State<T> startState = Iterators.find(
-			readStates.iterator(),
-			new Predicate<ComputationState<T>>() {
-				@Override
-				public boolean apply(@Nullable ComputationState<T> input) {
-					return input != null && input.getState().getName().equals(BEGINNING_STATE_NAME);
-				}
-			}).getState();
-
-		final Map<String, State<T>> convertedStates = NFACompiler.migrateGraph(startState);
-
-		for (ComputationState<T> readState : readStates) {
-			if (!readState.isStartState()) {
-				final String previousName = readState.getState().getName();
-				final String currentName = Iterators.find(
-					readState.getState().getStateTransitions().iterator(),
-					new Predicate<StateTransition<T>>() {
-						@Override
-						public boolean apply(@Nullable StateTransition<T> input) {
-							return input != null && input.getAction() == StateTransitionAction.TAKE;
-						}
-					}).getTargetState().getName();
-
-				final State<T> previousState = convertedStates.get(previousName);
-
-				computationStates.add(ComputationState.createState(
-					this,
-					convertedStates.get(currentName),
-					previousState,
-					readState.getEvent(),
-					0,
-					readState.getTimestamp(),
-					readState.getVersion(),
-					readState.getStartTimestamp()
-				));
-			}
-		}
-
-		final String startName = Iterators.find(convertedStates.values().iterator(), new Predicate<State<T>>() {
-			@Override
-			public boolean apply(@Nullable State<T> input) {
-				return input != null && input.isStart();
-			}
-		}).getName();
-
-		computationStates.add(ComputationState.createStartState(
-			this,
-			convertedStates.get(startName),
-			new DeweyNumber(this.startEventCounter)));
-
-		this.states.clear();
-		this.states.addAll(convertedStates.values());
-
-		return computationStates;
-	}
-
 	@SuppressWarnings("unchecked")
 	private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException {
 		final State<T> state = (State<T>) ois.readObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index c6f69b9..c36e7df 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.cep.nfa;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
@@ -335,47 +334,6 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 		this.pages = pages;
 	}
 
-	/**
-	 * For backward compatibility only. Previously the key in {@link SharedBuffer} was {@link State}.
-	 * Now it is {@link String}.
-	 */
-	@Internal
-	static <T> SharedBuffer<String, T> migrateSharedBuffer(SharedBuffer<State<T>, T> buffer) {
-
-		final Map<String, SharedBufferPage<String, T>> pageMap = new HashMap<>();
-		final Map<SharedBufferEntry<State<T>, T>, SharedBufferEntry<String, T>> entries = new HashMap<>();
-
-		for (Map.Entry<State<T>, SharedBufferPage<State<T>, T>> page : buffer.pages.entrySet()) {
-			final SharedBufferPage<String, T> newPage = new SharedBufferPage<>(page.getKey().getName());
-			pageMap.put(newPage.getKey(), newPage);
-
-			for (Map.Entry<ValueTimeWrapper<T>, SharedBufferEntry<State<T>, T>> pageEntry : page.getValue().entries.entrySet()) {
-				final SharedBufferEntry<String, T> newSharedBufferEntry = new SharedBufferEntry<>(
-					pageEntry.getKey(),
-					newPage);
-				newSharedBufferEntry.referenceCounter = pageEntry.getValue().referenceCounter;
-				entries.put(pageEntry.getValue(), newSharedBufferEntry);
-				newPage.entries.put(pageEntry.getKey(), newSharedBufferEntry);
-			}
-		}
-
-		for (Map.Entry<State<T>, SharedBufferPage<State<T>, T>> page : buffer.pages.entrySet()) {
-			for (Map.Entry<ValueTimeWrapper<T>, SharedBufferEntry<State<T>, T>> pageEntry : page.getValue().entries.entrySet()) {
-				final SharedBufferEntry<String, T> newEntry = entries.get(pageEntry.getValue());
-				for (SharedBufferEdge<State<T>, T> edge : pageEntry.getValue().edges) {
-					final SharedBufferEntry<String, T> targetNewEntry = entries.get(edge.getTarget());
-
-					final SharedBufferEdge<String, T> newEdge = new SharedBufferEdge<>(
-						targetNewEntry,
-						edge.getVersion());
-					newEntry.edges.add(newEdge);
-				}
-			}
-		}
-
-		return new SharedBuffer<>(buffer.valueSerializer, pageMap);
-	}
-
 	private SharedBufferEntry<K, V> get(
 			final K key,
 			final V value,
@@ -1177,76 +1135,4 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 			return CompatibilityResult.requiresMigration();
 		}
 	}
-
-	//////////////////			Java Serialization methods for backwards compatibility			//////////////////
-
-	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
-		DataInputViewStreamWrapper source = new DataInputViewStreamWrapper(ois);
-		ArrayList<SharedBufferEntry<K, V>> entryList = new ArrayList<>();
-		ois.defaultReadObject();
-
-		this.pages = new HashMap<>();
-
-		int numberPages = ois.readInt();
-
-		for (int i = 0; i < numberPages; i++) {
-			// key of the page
-			@SuppressWarnings("unchecked")
-			K key = (K) ois.readObject();
-
-			SharedBufferPage<K, V> page = new SharedBufferPage<>(key);
-
-			pages.put(key, page);
-
-			int numberEntries = ois.readInt();
-
-			for (int j = 0; j < numberEntries; j++) {
-				// restore the SharedBufferEntries for the given page
-				V value = valueSerializer.deserialize(source);
-				long timestamp = ois.readLong();
-
-				ValueTimeWrapper<V> valueTimeWrapper = new ValueTimeWrapper<>(value, timestamp, 0);
-				SharedBufferEntry<K, V> sharedBufferEntry = new SharedBufferEntry<K, V>(valueTimeWrapper, page);
-
-				sharedBufferEntry.referenceCounter = ois.readInt();
-
-				page.entries.put(valueTimeWrapper, sharedBufferEntry);
-
-				entryList.add(sharedBufferEntry);
-			}
-		}
-
-		// read the edges of the shared buffer entries
-		int numberEdges = ois.readInt();
-
-		for (int j = 0; j < numberEdges; j++) {
-			int sourceIndex = ois.readInt();
-			int targetIndex = ois.readInt();
-
-			if (sourceIndex >= entryList.size() || sourceIndex < 0) {
-				throw new RuntimeException("Could not find source entry with index " + sourceIndex +
-						". This indicates a corrupted state.");
-			} else {
-				// We've already deserialized the shared buffer entry. Simply read its ID and
-				// retrieve the buffer entry from the list of entries
-				SharedBufferEntry<K, V> sourceEntry = entryList.get(sourceIndex);
-
-				final DeweyNumber version = (DeweyNumber) ois.readObject();
-				final SharedBufferEntry<K, V> target;
-
-				if (targetIndex >= 0) {
-					if (targetIndex >= entryList.size()) {
-						throw new RuntimeException("Could not find target entry with index " + targetIndex +
-								". This indicates a corrupted state.");
-					} else {
-						target = entryList.get(targetIndex);
-					}
-				} else {
-					target = null;
-				}
-
-				sourceEntry.edges.add(new SharedBufferEdge<K, V>(target, version));
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 593c94f..5698de6 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.cep.nfa.compiler;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
@@ -36,11 +35,6 @@ import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.NotCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
-import org.apache.flink.shaded.guava18.com.google.common.base.Predicate;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
-
-import javax.annotation.Nullable;
-
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -858,114 +852,6 @@ public class NFACompiler {
 	}
 
 	/**
-	 * Used for migrating CEP graphs prior to 1.3. It removes the dummy start, adds the dummy end, and translates all
-	 * states to consuming ones by moving all TAKEs and IGNOREs to the next state. This method assumes each state
-	 * has at most one TAKE and one IGNORE and name of each state is unique. No PROCEED transition is allowed!
-	 *
-	 * @param oldStartState dummy start state of old graph
-	 * @param <T>           type of events
-	 * @return map of new states, where key is the name of a state and value is the state itself
-	 */
-	@Internal
-	public static <T> Map<String, State<T>> migrateGraph(State<T> oldStartState) {
-		State<T> oldFirst = oldStartState;
-		State<T> oldSecond = oldStartState.getStateTransitions().iterator().next().getTargetState();
-
-		StateTransition<T> oldFirstToSecondTake = Iterators.find(
-			oldFirst.getStateTransitions().iterator(),
-			new Predicate<StateTransition<T>>() {
-				@Override
-				public boolean apply(@Nullable StateTransition<T> input) {
-					return input != null && input.getAction() == StateTransitionAction.TAKE;
-				}
-
-			});
-
-		StateTransition<T> oldFirstIgnore = Iterators.find(
-			oldFirst.getStateTransitions().iterator(),
-			new Predicate<StateTransition<T>>() {
-				@Override
-				public boolean apply(@Nullable StateTransition<T> input) {
-					return input != null && input.getAction() == StateTransitionAction.IGNORE;
-				}
-
-			}, null);
-
-		StateTransition<T> oldSecondToThirdTake = Iterators.find(
-			oldSecond.getStateTransitions().iterator(),
-			new Predicate<StateTransition<T>>() {
-				@Override
-				public boolean apply(@Nullable StateTransition<T> input) {
-					return input != null && input.getAction() == StateTransitionAction.TAKE;
-				}
-
-			}, null);
-
-		final Map<String, State<T>> convertedStates = new HashMap<>();
-		State<T> newSecond;
-		State<T> newFirst = new State<>(oldSecond.getName(), State.StateType.Start);
-		convertedStates.put(newFirst.getName(), newFirst);
-		while (oldSecondToThirdTake != null) {
-
-			newSecond = new State<T>(oldSecondToThirdTake.getTargetState().getName(), State.StateType.Normal);
-			convertedStates.put(newSecond.getName(), newSecond);
-			newFirst.addTake(newSecond, oldFirstToSecondTake.getCondition());
-
-			if (oldFirstIgnore != null) {
-				newFirst.addIgnore(oldFirstIgnore.getCondition());
-			}
-
-			oldFirst = oldSecond;
-
-			oldFirstToSecondTake = Iterators.find(
-				oldFirst.getStateTransitions().iterator(),
-				new Predicate<StateTransition<T>>() {
-					@Override
-					public boolean apply(@Nullable StateTransition<T> input) {
-						return input != null && input.getAction() == StateTransitionAction.TAKE;
-					}
-
-				});
-
-			oldFirstIgnore = Iterators.find(
-				oldFirst.getStateTransitions().iterator(),
-				new Predicate<StateTransition<T>>() {
-					@Override
-					public boolean apply(@Nullable StateTransition<T> input) {
-						return input != null && input.getAction() == StateTransitionAction.IGNORE;
-					}
-
-				}, null);
-
-			oldSecond = oldSecondToThirdTake.getTargetState();
-
-			oldSecondToThirdTake = Iterators.find(
-				oldSecond.getStateTransitions().iterator(),
-				new Predicate<StateTransition<T>>() {
-					@Override
-					public boolean apply(@Nullable StateTransition<T> input) {
-						return input != null && input.getAction() == StateTransitionAction.TAKE;
-					}
-
-				}, null);
-
-			newFirst = newSecond;
-		}
-
-		final State<T> endingState = new State<>(ENDING_STATE_NAME, State.StateType.Final);
-
-		newFirst.addTake(endingState, oldFirstToSecondTake.getCondition());
-
-		if (oldFirstIgnore != null) {
-			newFirst.addIgnore(oldFirstIgnore.getCondition());
-		}
-
-		convertedStates.put(endingState.getName(), endingState);
-
-		return convertedStates;
-	}
-
-	/**
 	 * Factory interface for {@link NFA}.
 	 *
 	 * @param <T> Type of the input events which are processed by the NFA

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 7556d9f..257d3e7 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -24,48 +24,29 @@ import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.EventComparator;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Migration;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
@@ -80,23 +61,17 @@ import java.util.stream.StreamSupport;
  * @param <IN> Type of the input elements
  * @param <KEY> Type of the key on which the input stream is keyed
  * @param <OUT> Type of the output elements
- * @param <F> user function that can be applied to matching sequences or timed out sequences
  */
 public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Function>
 	extends AbstractUdfStreamOperator<OUT, F>
-	implements OneInputStreamOperator<IN, OUT>, Triggerable<KEY, VoidNamespace>, CheckpointedRestoringOperator {
+	implements OneInputStreamOperator<IN, OUT>, Triggerable<KEY, VoidNamespace> {
 
 	private static final long serialVersionUID = -4166778210774160757L;
 
-	private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
-
 	private final boolean isProcessingTime;
 
 	private final TypeSerializer<IN> inputSerializer;
 
-	// necessary to serialize the set of seen keys
-	private final TypeSerializer<KEY> keySerializer;
-
 	///////////////			State			//////////////
 
 	private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorStateName";
@@ -115,12 +90,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
 	 */
 	private long lastWatermark;
 
-	/**
-	 * A flag used in the case of migration that indicates if
-	 * we are restoring from an old keyed or non-keyed operator.
-	 */
-	private final boolean migratingFromOldKeyedOperator;
-
 	private final EventComparator<IN> comparator;
 
 	public AbstractKeyedCEPPatternOperator(
@@ -135,10 +104,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
 
 		this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
 		this.isProcessingTime = Preconditions.checkNotNull(isProcessingTime);
-		this.keySerializer = Preconditions.checkNotNull(keySerializer);
 		this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
-
-		this.migratingFromOldKeyedOperator = migratingFromOldKeyedOperator;
 		this.comparator = comparator;
 	}
 
@@ -384,295 +350,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
 			long timestamp) throws Exception {
 	}
 
-	//////////////////////			Backwards Compatibility			//////////////////////
-
-	@Override
-	public void restoreState(FSDataInputStream in) throws Exception {
-		if (in instanceof Migration) {
-			// absorb the introduced byte from the migration stream
-			int hasUdfState = in.read();
-			if (hasUdfState == 1) {
-				throw new Exception("Found UDF state but CEPOperator is not an UDF operator.");
-			}
-		}
-
-		DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(in);
-		timerService = getInternalTimerService(
-				"watermark-callbacks",
-				VoidNamespaceSerializer.INSTANCE,
-				this);
-
-		// this is with the old serializer so that we can read the state.
-		ValueState<NFA<IN>> oldNfaOperatorState = getRuntimeContext().getState(
-				new ValueStateDescriptor<>("nfaOperatorState", new NFA.Serializer<IN>()));
-
-		ValueState<PriorityQueue<StreamRecord<IN>>> oldPriorityQueueOperatorState =
-				getRuntimeContext().getState(
-					new ValueStateDescriptor<>(
-							"priorityQueueStateName",
-							new PriorityQueueSerializer<>(
-									((TypeSerializer) new StreamElementSerializer<>(inputSerializer)),
-									new PriorityQueueStreamRecordFactory<IN>()
-							)
-					)
-			);
-
-		if (migratingFromOldKeyedOperator) {
-			int numberEntries = inputView.readInt();
-			for (int i = 0; i < numberEntries; i++) {
-				KEY key = keySerializer.deserialize(inputView);
-				setCurrentKey(key);
-				saveRegisterWatermarkTimer();
-
-				NFA<IN> nfa = oldNfaOperatorState.value();
-				oldNfaOperatorState.clear();
-				nfaOperatorState.update(nfa);
-
-				PriorityQueue<StreamRecord<IN>> priorityQueue = oldPriorityQueueOperatorState.value();
-				if (priorityQueue != null && !priorityQueue.isEmpty()) {
-					Map<Long, List<IN>> elementMap = new HashMap<>();
-					for (StreamRecord<IN> record: priorityQueue) {
-						long timestamp = record.getTimestamp();
-						IN element = record.getValue();
-
-						List<IN> elements = elementMap.get(timestamp);
-						if (elements == null) {
-							elements = new ArrayList<>();
-							elementMap.put(timestamp, elements);
-						}
-						elements.add(element);
-					}
-
-					// write the old state into the new one.
-					for (Map.Entry<Long, List<IN>> entry: elementMap.entrySet()) {
-						elementQueueState.put(entry.getKey(), entry.getValue());
-					}
-
-					// clear the old state
-					oldPriorityQueueOperatorState.clear();
-				}
-			}
-		} else {
-
-			final ObjectInputStream ois = new ObjectInputStream(in);
-
-			// retrieve the NFA
-			@SuppressWarnings("unchecked")
-			NFA<IN> nfa = (NFA<IN>) ois.readObject();
-
-			// retrieve the elements that were pending in the priority queue
-			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
-
-			Map<Long, List<IN>> elementMap = new HashMap<>();
-			int entries = ois.readInt();
-			for (int i = 0; i < entries; i++) {
-				StreamElement streamElement = recordSerializer.deserialize(inputView);
-				StreamRecord<IN> record = streamElement.<IN>asRecord();
-
-				long timestamp = record.getTimestamp();
-				IN element = record.getValue();
-
-				List<IN> elements = elementMap.get(timestamp);
-				if (elements == null) {
-					elements = new ArrayList<>();
-					elementMap.put(timestamp, elements);
-				}
-				elements.add(element);
-			}
-
-			// finally register the retrieved state with the new keyed state.
-			setCurrentKey((byte) 0);
-			nfaOperatorState.update(nfa);
-
-			// write the priority queue to the new map state.
-			for (Map.Entry<Long, List<IN>> entry: elementMap.entrySet()) {
-				elementQueueState.put(entry.getKey(), entry.getValue());
-			}
-
-			if (!isProcessingTime) {
-				// this is relevant only for event/ingestion time
-				setCurrentKey((byte) 0);
-				saveRegisterWatermarkTimer();
-			}
-			ois.close();
-		}
-	}
-
-	//////////////////////			Utility Classes			//////////////////////
-
-	/**
-	 * Custom type serializer implementation to serialize priority queues.
-	 *
-	 * @param <T> Type of the priority queue's elements
-	 */
-	private static class PriorityQueueSerializer<T> extends TypeSerializer<PriorityQueue<T>> {
-
-		private static final long serialVersionUID = -231980397616187715L;
-
-		private final TypeSerializer<T> elementSerializer;
-		private final PriorityQueueFactory<T> factory;
-
-		PriorityQueueSerializer(final TypeSerializer<T> elementSerializer, final PriorityQueueFactory<T> factory) {
-			this.elementSerializer = elementSerializer;
-			this.factory = factory;
-		}
-
-		@Override
-		public boolean isImmutableType() {
-			return false;
-		}
-
-		@Override
-		public TypeSerializer<PriorityQueue<T>> duplicate() {
-			return new PriorityQueueSerializer<>(elementSerializer.duplicate(), factory);
-		}
-
-		@Override
-		public PriorityQueue<T> createInstance() {
-			return factory.createPriorityQueue();
-		}
-
-		@Override
-		public PriorityQueue<T> copy(PriorityQueue<T> from) {
-			PriorityQueue<T> result = factory.createPriorityQueue();
-
-			for (T element: from) {
-				result.offer(elementSerializer.copy(element));
-			}
-
-			return result;
-		}
-
-		@Override
-		public PriorityQueue<T> copy(PriorityQueue<T> from, PriorityQueue<T> reuse) {
-			reuse.clear();
-
-			for (T element: from) {
-				reuse.offer(elementSerializer.copy(element));
-			}
-
-			return reuse;
-		}
-
-		@Override
-		public int getLength() {
-			return 0;
-		}
-
-		@Override
-		public void serialize(PriorityQueue<T> record, DataOutputView target) throws IOException {
-			target.writeInt(record.size());
-
-			for (T element: record) {
-				elementSerializer.serialize(element, target);
-			}
-		}
-
-		@Override
-		public PriorityQueue<T> deserialize(DataInputView source) throws IOException {
-			PriorityQueue<T> result = factory.createPriorityQueue();
-
-			return deserialize(result, source);
-		}
-
-		@Override
-		public PriorityQueue<T> deserialize(PriorityQueue<T> reuse, DataInputView source) throws IOException {
-			reuse.clear();
-
-			int numberEntries = source.readInt();
-
-			for (int i = 0; i < numberEntries; i++) {
-				reuse.offer(elementSerializer.deserialize(source));
-			}
-
-			return reuse;
-		}
-
-		@Override
-		public void copy(DataInputView source, DataOutputView target) throws IOException {
-
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj instanceof PriorityQueueSerializer) {
-				@SuppressWarnings("unchecked")
-				PriorityQueueSerializer<T> other = (PriorityQueueSerializer<T>) obj;
-
-				return factory.equals(other.factory) && elementSerializer.equals(other.elementSerializer);
-			} else {
-				return false;
-			}
-		}
-
-		@Override
-		public boolean canEqual(Object obj) {
-			return obj instanceof PriorityQueueSerializer;
-		}
-
-		@Override
-		public int hashCode() {
-			return Objects.hash(factory, elementSerializer);
-		}
-
-		// --------------------------------------------------------------------------------------------
-		// Serializer configuration snapshotting & compatibility
-		// --------------------------------------------------------------------------------------------
-
-		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
-			return new CollectionSerializerConfigSnapshot<>(elementSerializer);
-		}
-
-		@Override
-		public CompatibilityResult<PriorityQueue<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-			if (configSnapshot instanceof CollectionSerializerConfigSnapshot) {
-				Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousElemSerializerAndConfig =
-					((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
-
-				CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
-					previousElemSerializerAndConfig.f0,
-					UnloadableDummyTypeSerializer.class,
-					previousElemSerializerAndConfig.f1,
-					elementSerializer);
-
-				if (!compatResult.isRequiresMigration()) {
-					return CompatibilityResult.compatible();
-				} else if (compatResult.getConvertDeserializer() != null) {
-					return CompatibilityResult.requiresMigration(
-						new PriorityQueueSerializer<>(
-							new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), factory));
-				}
-			}
-
-			return CompatibilityResult.requiresMigration();
-		}
-	}
-
-	private interface PriorityQueueFactory<T> extends Serializable {
-		PriorityQueue<T> createPriorityQueue();
-	}
-
-	private static class PriorityQueueStreamRecordFactory<T> implements PriorityQueueFactory<StreamRecord<T>> {
-
-		private static final long serialVersionUID = 1254766984454616593L;
-
-		@Override
-		public PriorityQueue<StreamRecord<T>> createPriorityQueue() {
-			return new PriorityQueue<StreamRecord<T>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<T>());
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			return obj instanceof PriorityQueueStreamRecordFactory;
-		}
-
-		@Override
-		public int hashCode() {
-			return getClass().hashCode();
-		}
-	}
-
 	//////////////////////			Testing Methods			//////////////////////
 
 	@VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
deleted file mode 100644
index 843d668..0000000
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.ByteSerializer;
-import org.apache.flink.api.java.functions.NullByteKeySelector;
-import org.apache.flink.cep.Event;
-import org.apache.flink.cep.SubEvent;
-import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.cep.pattern.conditions.SimpleCondition;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-
-import org.junit.Test;
-
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for migration from 1.1.x to 1.3.x.
- */
-public class CEPMigration11to13Test {
-
-	private static String getResourceFilename(String filename) {
-		ClassLoader cl = CEPMigration11to13Test.class.getClassLoader();
-		URL resource = cl.getResource(filename);
-		if (resource == null) {
-			throw new NullPointerException("Missing snapshot resource.");
-		}
-		return resource.getFile();
-	}
-
-	@Test
-	public void testKeyedCEPOperatorMigratation() throws Exception {
-
-		final Event startEvent = new Event(42, "start", 1.0);
-		final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
-		final Event endEvent = new Event(42, "end", 1.0);
-
-		// uncomment these lines for regenerating the snapshot on Flink 1.1
-		/*
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
-				new KeyedCepOperator<>(
-						Event.createTypeSerializer(),
-						false,
-						keySelector,
-						IntSerializer.INSTANCE,
-						new NFAFactory()));
-		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
-		harness.open();
-		harness.processElement(new StreamRecord<Event>(startEvent, 1));
-		harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
-		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
-		harness.processWatermark(new Watermark(2));
-
-		harness.processElement(new StreamRecord<Event>(middleEvent, 3));
-
-		// simulate snapshot/restore with empty element queue but NFA state
-		StreamTaskState snapshot = harness.snapshot(1, 1);
-		FileOutputStream out = new FileOutputStream(
-				"src/test/resources/cep-keyed-1_1-snapshot");
-		ObjectOutputStream oos = new ObjectOutputStream(out);
-		oos.writeObject(snapshot);
-		out.close();
-		harness.close();
-		*/
-
-		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(
-			CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory()));
-
-		try {
-			harness.setup();
-			harness
-				.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-1_1-snapshot"));
-			harness.open();
-
-			harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
-			harness.processElement(new StreamRecord<>(endEvent, 5));
-
-			harness.processWatermark(new Watermark(20));
-
-			ConcurrentLinkedQueue<Object> result = harness.getOutput();
-
-			// watermark and the result
-			assertEquals(2, result.size());
-
-			Object resultObject = result.poll();
-			assertTrue(resultObject instanceof StreamRecord);
-			StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
-			assertTrue(resultRecord.getValue() instanceof Map);
-
-			@SuppressWarnings("unchecked")
-			Map<String, List<Event>> patternMap =
-				(Map<String, List<Event>>) resultRecord.getValue();
-
-			assertEquals(startEvent, patternMap.get("start").get(0));
-			assertEquals(middleEvent, patternMap.get("middle").get(0));
-			assertEquals(endEvent, patternMap.get("end").get(0));
-
-			// and now go for a checkpoint with the new serializers
-
-			final Event startEvent1 = new Event(42, "start", 2.0);
-			final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0);
-			final Event endEvent1 = new Event(42, "end", 2.0);
-
-			harness.processElement(new StreamRecord<Event>(startEvent1, 21));
-			harness.processElement(new StreamRecord<Event>(middleEvent1, 23));
-
-			// simulate snapshot/restore with some elements in internal sorting queue
-			OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
-			harness.close();
-
-			harness = CepOperatorTestUtilities.getCepTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(
-				false,
-				new NFAFactory()));
-
-			harness.setup();
-			harness.initializeState(snapshot);
-			harness.open();
-
-			harness.processElement(new StreamRecord<>(endEvent1, 25));
-
-			harness.processWatermark(new Watermark(50));
-
-			result = harness.getOutput();
-
-			// watermark and the result
-			assertEquals(2, result.size());
-
-			Object resultObject1 = result.poll();
-			assertTrue(resultObject1 instanceof StreamRecord);
-			StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
-			assertTrue(resultRecord1.getValue() instanceof Map);
-
-			@SuppressWarnings("unchecked")
-			Map<String, List<Event>> patternMap1 =
-				(Map<String, List<Event>>) resultRecord1.getValue();
-
-			assertEquals(startEvent1, patternMap1.get("start").get(0));
-			assertEquals(middleEvent1, patternMap1.get("middle").get(0));
-			assertEquals(endEvent1, patternMap1.get("end").get(0));
-		} finally {
-			harness.close();
-		}
-	}
-
-	@Test
-	public void testNonKeyedCEPFunctionMigration() throws Exception {
-
-		final Event startEvent = new Event(42, "start", 1.0);
-		final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
-		final Event endEvent = new Event(42, "end", 1.0);
-
-		// uncomment these lines for regenerating the snapshot on Flink 1.1
-		/*
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
-				new CEPPatternOperator<>(
-						Event.createTypeSerializer(),
-						false,
-						new NFAFactory()));
-		harness.open();
-		harness.processElement(new StreamRecord<Event>(startEvent, 1));
-		harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
-		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
-		harness.processWatermark(new Watermark(2));
-
-		harness.processElement(new StreamRecord<Event>(middleEvent, 3));
-
-		// simulate snapshot/restore with empty element queue but NFA state
-		StreamTaskState snapshot = harness.snapshot(1, 1);
-		FileOutputStream out = new FileOutputStream(
-				"src/test/resources/cep-non-keyed-1.1-snapshot");
-		ObjectOutputStream oos = new ObjectOutputStream(out);
-		oos.writeObject(snapshot);
-		out.close();
-		harness.close();
-		*/
-
-		NullByteKeySelector keySelector = new NullByteKeySelector();
-
-		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
-			new KeyedOneInputStreamOperatorTestHarness<Byte, Event, Map<String, List<Event>>>(
-				CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory(), ByteSerializer.INSTANCE, false, null),
-				keySelector,
-				BasicTypeInfo.BYTE_TYPE_INFO);
-
-		try {
-			harness.setup();
-			harness.initializeStateFromLegacyCheckpoint(
-				getResourceFilename("cep-non-keyed-1.1-snapshot"));
-			harness.open();
-
-			harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
-			harness.processElement(new StreamRecord<>(endEvent, 5));
-
-			harness.processWatermark(new Watermark(20));
-
-			ConcurrentLinkedQueue<Object> result = harness.getOutput();
-
-			// watermark and the result
-			assertEquals(2, result.size());
-
-			Object resultObject = result.poll();
-			assertTrue(resultObject instanceof StreamRecord);
-			StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
-			assertTrue(resultRecord.getValue() instanceof Map);
-
-			@SuppressWarnings("unchecked")
-			Map<String, List<Event>> patternMap =
-				(Map<String, List<Event>>) resultRecord.getValue();
-
-			assertEquals(startEvent, patternMap.get("start").get(0));
-			assertEquals(middleEvent, patternMap.get("middle").get(0));
-			assertEquals(endEvent, patternMap.get("end").get(0));
-
-			// and now go for a checkpoint with the new serializers
-
-			final Event startEvent1 = new Event(42, "start", 2.0);
-			final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0);
-			final Event endEvent1 = new Event(42, "end", 2.0);
-
-			harness.processElement(new StreamRecord<Event>(startEvent1, 21));
-			harness.processElement(new StreamRecord<Event>(middleEvent1, 23));
-
-			// simulate snapshot/restore with some elements in internal sorting queue
-			OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
-			harness.close();
-
-			harness = new KeyedOneInputStreamOperatorTestHarness<Byte, Event, Map<String, List<Event>>>(
-				CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory(), ByteSerializer.INSTANCE),
-				keySelector,
-				BasicTypeInfo.BYTE_TYPE_INFO);
-
-			harness.setup();
-			harness.initializeState(snapshot);
-			harness.open();
-
-			harness.processElement(new StreamRecord<>(endEvent1, 25));
-
-			harness.processWatermark(new Watermark(50));
-
-			result = harness.getOutput();
-
-			// watermark and the result
-			assertEquals(2, result.size());
-
-			Object resultObject1 = result.poll();
-			assertTrue(resultObject1 instanceof StreamRecord);
-			StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
-			assertTrue(resultRecord1.getValue() instanceof Map);
-
-			@SuppressWarnings("unchecked")
-			Map<String, List<Event>> patternMap1 =
-				(Map<String, List<Event>>) resultRecord1.getValue();
-
-			assertEquals(startEvent1, patternMap1.get("start").get(0));
-			assertEquals(middleEvent1, patternMap1.get("middle").get(0));
-			assertEquals(endEvent1, patternMap1.get("end").get(0));
-		} finally {
-			harness.close();
-		}
-	}
-
-	private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
-
-		private static final long serialVersionUID = 1173020762472766713L;
-
-		private final boolean handleTimeout;
-
-		private NFAFactory() {
-			this(false);
-		}
-
-		private NFAFactory(boolean handleTimeout) {
-			this.handleTimeout = handleTimeout;
-		}
-
-		@Override
-		public NFA<Event> createNFA() {
-
-			Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter())
-					.followedBy("middle").subtype(SubEvent.class).where(new MiddleFilter())
-					.followedBy("end").where(new EndFilter())
-					// add a window timeout to test whether timestamps of elements in the
-					// priority queue in CEP operator are correctly checkpointed/restored
-					.within(Time.milliseconds(10L));
-
-			return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
-		}
-	}
-
-	private static class StartFilter extends SimpleCondition<Event> {
-		private static final long serialVersionUID = 5726188262756267490L;
-
-		@Override
-		public boolean filter(Event value) throws Exception {
-			return value.getName().equals("start");
-		}
-	}
-
-	private static class MiddleFilter extends SimpleCondition<SubEvent> {
-		private static final long serialVersionUID = 6215754202506583964L;
-
-		@Override
-		public boolean filter(SubEvent value) throws Exception {
-			return value.getVolume() > 5.0;
-		}
-	}
-
-	private static class EndFilter extends SimpleCondition<Event> {
-		private static final long serialVersionUID = 7056763917392056548L;
-
-		@Override
-		public boolean filter(Event value) throws Exception {
-			return value.getName().equals("end");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
index cf3c921..ed28f25 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
@@ -71,7 +71,7 @@ public class CEPMigrationTest {
 
 	@Parameterized.Parameters(name = "Migration Savepoint: {0}")
 	public static Collection<MigrationVersion> parameters () {
-		return Arrays.asList(MigrationVersion.v1_2, MigrationVersion.v1_3);
+		return Arrays.asList(MigrationVersion.v1_3);
 	}
 
 	public CEPMigrationTest(MigrationVersion migrateVersion) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
deleted file mode 100644
index c4e23ca..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration;
-
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * The purpose of this class is the be filled in as a placeholder for the namespace serializer when migrating from
- * Flink 1.1 savepoint (which did not include the namespace serializer) to Flink 1.2 (which always must include a
- * (non-null) namespace serializer. This is then replaced as soon as the user is re-registering her state again for
- * the first run under Flink 1.2 and provides again the real namespace serializer.
- *
- * @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes.
- */
-@Deprecated
-@SuppressWarnings("deprecation")
-public class MigrationNamespaceSerializerProxy extends TypeSerializer<Serializable> {
-
-	public static final MigrationNamespaceSerializerProxy INSTANCE = new MigrationNamespaceSerializerProxy();
-
-	private static final long serialVersionUID = -707800010807094491L;
-
-	private MigrationNamespaceSerializerProxy() {
-	}
-
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public TypeSerializer<Serializable> duplicate() {
-		return this;
-	}
-
-	@Override
-	public Serializable createInstance() {
-		throw new UnsupportedOperationException(
-				"This is just a proxy used during migration until the real type serializer is provided by the user.");
-	}
-
-	@Override
-	public Serializable copy(Serializable from) {
-		throw new UnsupportedOperationException(
-				"This is just a proxy used during migration until the real type serializer is provided by the user.");
-	}
-
-	@Override
-	public Serializable copy(Serializable from, Serializable reuse) {
-		throw new UnsupportedOperationException(
-				"This is just a proxy used during migration until the real type serializer is provided by the user.");
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	@Override
-	public void serialize(Serializable record, DataOutputView target) throws IOException {
-		throw new UnsupportedOperationException(
-				"This is just a proxy used during migration until the real type serializer is provided by the user.");
-	}
-
-	@Override
-	public Serializable deserialize(DataInputView source) throws IOException {
-		throw new UnsupportedOperationException(
-				"This is just a proxy used during migration until the real type serializer is provided by the user.");
-	}
-
-	@Override
-	public Serializable deserialize(Serializable reuse, DataInputView source) throws IOException {
-		throw new UnsupportedOperationException(
-				"This is just a proxy used during migration until the real type serializer is provided by the user.");
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		throw new UnsupportedOperationException(
-				"This is just a proxy used during migration until the real type serializer is provided by the user.");
-	}
-
-	@Override
-	public TypeSerializerConfigSnapshot snapshotConfiguration() {
-		return new ParameterlessTypeSerializerConfig(getClass().getCanonicalName());
-	}
-
-	@Override
-	public CompatibilityResult<Serializable> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-		// always assume compatibility since we're just a proxy for migration
-		return CompatibilityResult.compatible();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		return obj instanceof MigrationNamespaceSerializerProxy;
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return true;
-	}
-
-	@Override
-	public int hashCode() {
-		return 42;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
deleted file mode 100644
index a6055a8..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration;
-
-import org.apache.flink.migration.state.MigrationKeyGroupStateHandle;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-
-import java.util.Collection;
-
-/**
- * Utility functions for migration.
- */
-public class MigrationUtil {
-
-	@SuppressWarnings("deprecation")
-	public static boolean isOldSavepointKeyedState(Collection<KeyedStateHandle> keyedStateHandles) {
-		return (keyedStateHandles != null)
-				&& (keyedStateHandles.size() == 1)
-				&& (keyedStateHandles.iterator().next() instanceof MigrationKeyGroupStateHandle);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
deleted file mode 100644
index 5196d2d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.api.common.state;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.StateBinder;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-/**
- * The old version of the {@link org.apache.flink.api.common.state.ListStateDescriptor}, retained for
- * serialization backwards compatibility.
- *
- * @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes.
- */
-@Internal
-@Deprecated
-@SuppressWarnings("deprecation")
-public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Creates a new {@code ListStateDescriptor} with the given name and list element type.
-	 *
-	 * <p>If this constructor fails (because it is not possible to describe the type via a class),
-	 * consider using the {@link #ListStateDescriptor(String, TypeInformation)} constructor.
-	 *
-	 * @param name The (unique) name for the state.
-	 * @param typeClass The type of the values in the state.
-	 */
-	public ListStateDescriptor(String name, Class<T> typeClass) {
-		super(name, typeClass, null);
-	}
-
-	/**
-	 * Creates a new {@code ListStateDescriptor} with the given name and list element type.
-	 *
-	 * @param name The (unique) name for the state.
-	 * @param typeInfo The type of the values in the state.
-	 */
-	public ListStateDescriptor(String name, TypeInformation<T> typeInfo) {
-		super(name, typeInfo, null);
-	}
-
-	/**
-	 * Creates a new {@code ListStateDescriptor} with the given name and list element type.
-	 *
-	 * @param name The (unique) name for the state.
-	 * @param typeSerializer The type serializer for the list values.
-	 */
-	public ListStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
-		super(name, typeSerializer, null);
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public ListState<T> bind(StateBinder stateBinder) throws Exception {
-		throw new IllegalStateException("Cannot bind states with a legacy state descriptor.");
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
-
-		return serializer.equals(that.serializer) && name.equals(that.name);
-
-	}
-
-	@Override
-	public int hashCode() {
-		int result = serializer.hashCode();
-		result = 31 * result + name.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "ListStateDescriptor{" +
-				"serializer=" + serializer +
-				'}';
-	}
-
-	@Override
-	public org.apache.flink.api.common.state.StateDescriptor.Type getType() {
-		return org.apache.flink.api.common.state.StateDescriptor.Type.LIST;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java
deleted file mode 100644
index 0b25e08..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.runtime.checkpoint;
-
-import org.apache.flink.migration.runtime.state.StateHandle;
-import org.apache.flink.migration.util.SerializedValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-
-/**
- * Simple container class which contains the serialized state handle for a key group.
- *
- * The key group state handle is kept in serialized form because it can contain user code classes
- * which might not be available on the JobManager.
- *
- * @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes.
- */
-@Deprecated
-@SuppressWarnings("deprecation")
-public class KeyGroupState implements Serializable {
-	private static final long serialVersionUID = -5926696455438467634L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(KeyGroupState.class);
-
-	private final SerializedValue<StateHandle<?>> keyGroupState;
-
-	private final long stateSize;
-
-	private final long duration;
-
-	public KeyGroupState(SerializedValue<StateHandle<?>> keyGroupState, long stateSize, long duration) {
-		this.keyGroupState = keyGroupState;
-
-		this.stateSize = stateSize;
-
-		this.duration = duration;
-	}
-
-	public SerializedValue<StateHandle<?>> getKeyGroupState() {
-		return keyGroupState;
-	}
-
-	public long getDuration() {
-		return duration;
-	}
-
-	public long getStateSize() {
-		return stateSize;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof KeyGroupState) {
-			KeyGroupState other = (KeyGroupState) obj;
-
-			return keyGroupState.equals(other.keyGroupState) && stateSize == other.stateSize &&
-				duration == other.duration;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		return (int) (this.stateSize ^ this.stateSize >>> 32) +
-			31 * ((int) (this.duration ^ this.duration >>> 32) +
-				31 * keyGroupState.hashCode());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java
deleted file mode 100644
index d42d146..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.runtime.checkpoint;
-
-import org.apache.flink.migration.runtime.state.StateHandle;
-import org.apache.flink.migration.util.SerializedValue;
-
-import java.io.Serializable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes.
- */
-@Deprecated
-@SuppressWarnings("deprecation")
-public class SubtaskState implements Serializable {
-
-	private static final long serialVersionUID = -2394696997971923995L;
-
-	/** The state of the parallel operator */
-	private final SerializedValue<StateHandle<?>> state;
-
-	/**
-	 * The state size. This is also part of the deserialized state handle.
-	 * We store it here in order to not deserialize the state handle when
-	 * gathering stats.
-	 */
-	private final long stateSize;
-
-	/** The duration of the acknowledged (ack timestamp - trigger timestamp). */
-	private final long duration;
-	
-	public SubtaskState(
-			SerializedValue<StateHandle<?>> state,
-			long stateSize,
-			long duration) {
-
-		this.state = checkNotNull(state, "State");
-		// Sanity check and don't fail checkpoint because of this.
-		this.stateSize = stateSize >= 0 ? stateSize : 0;
-
-		this.duration = duration;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public SerializedValue<StateHandle<?>> getState() {
-		return state;
-	}
-
-	public long getStateSize() {
-		return stateSize;
-	}
-
-	public long getDuration() {
-		return duration;
-	}
-
-	public void discard(ClassLoader userClassLoader) throws Exception {
-
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		else if (o instanceof SubtaskState) {
-			SubtaskState that = (SubtaskState) o;
-			return this.state.equals(that.state) && stateSize == that.stateSize &&
-				duration == that.duration;
-		}
-		else {
-			return false;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		return (int) (this.stateSize ^ this.stateSize >>> 32) +
-			31 * ((int) (this.duration ^ this.duration >>> 32) +
-				31 * state.hashCode());
-	}
-
-	@Override
-	public String toString() {
-		return String.format("SubtaskState(Size: %d, Duration: %d, State: %s)", stateSize, duration, state);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java
deleted file mode 100644
index c0a7b2d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.runtime.checkpoint;
-
-import org.apache.flink.migration.runtime.state.StateHandle;
-import org.apache.flink.migration.util.SerializedValue;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes.
- */
-@Deprecated
-@SuppressWarnings("deprecation")
-public class TaskState implements Serializable {
-
-	private static final long serialVersionUID = -4845578005863201810L;
-
-	private final JobVertexID jobVertexID;
-
-	/** Map of task states which can be accessed by their sub task index */
-	private final Map<Integer, SubtaskState> subtaskStates;
-
-	/** Map of key-value states which can be accessed by their key group index */
-	private final Map<Integer, KeyGroupState> kvStates;
-
-	/** Parallelism of the operator when it was checkpointed */
-	private final int parallelism;
-
-	public TaskState(JobVertexID jobVertexID, int parallelism) {
-		this.jobVertexID = jobVertexID;
-
-		this.subtaskStates = new HashMap<>(parallelism);
-
-		this.kvStates = new HashMap<>();
-
-		this.parallelism = parallelism;
-	}
-
-	public JobVertexID getJobVertexID() {
-		return jobVertexID;
-	}
-
-	public void putState(int subtaskIndex, SubtaskState subtaskState) {
-		if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
-			throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
-				" exceeds the maximum number of sub tasks " + subtaskStates.size());
-		} else {
-			subtaskStates.put(subtaskIndex, subtaskState);
-		}
-	}
-
-	public SubtaskState getState(int subtaskIndex) {
-		if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
-			throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
-				" exceeds the maximum number of sub tasks " + subtaskStates.size());
-		} else {
-			return subtaskStates.get(subtaskIndex);
-		}
-	}
-
-	public Collection<SubtaskState> getStates() {
-		return subtaskStates.values();
-	}
-
-	public Map<Integer, SubtaskState> getSubtaskStatesById() {
-		return subtaskStates;
-	}
-
-	public long getStateSize() {
-		long result = 0L;
-
-		for (SubtaskState subtaskState : subtaskStates.values()) {
-			result += subtaskState.getStateSize();
-		}
-
-		for (KeyGroupState keyGroupState : kvStates.values()) {
-			result += keyGroupState.getStateSize();
-		}
-
-		return result;
-	}
-
-	public int getNumberCollectedStates() {
-		return subtaskStates.size();
-	}
-
-	public int getParallelism() {
-		return parallelism;
-	}
-
-	public void putKvState(int keyGroupId, KeyGroupState keyGroupState) {
-		kvStates.put(keyGroupId, keyGroupState);
-	}
-
-	public KeyGroupState getKvState(int keyGroupId) {
-		return kvStates.get(keyGroupId);
-	}
-
-	/**
-	 * Retrieve the set of key-value state key groups specified by the given key group partition set.
-	 * The key groups are returned as a map where the key group index maps to the serialized state
-	 * handle of the key group.
-	 *
-	 * @param keyGroupPartition Set of key group indices
-	 * @return Map of serialized key group state handles indexed by their key group index.
-	 */
-	public Map<Integer, SerializedValue<StateHandle<?>>> getUnwrappedKvStates(Set<Integer> keyGroupPartition) {
-		HashMap<Integer, SerializedValue<StateHandle<?>>> result = new HashMap<>(keyGroupPartition.size());
-
-		for (Integer keyGroupId : keyGroupPartition) {
-			KeyGroupState keyGroupState = kvStates.get(keyGroupId);
-
-			if (keyGroupState != null) {
-				result.put(keyGroupId, kvStates.get(keyGroupId).getKeyGroupState());
-			}
-		}
-
-		return result;
-	}
-
-	public int getNumberCollectedKvStates() {
-		return kvStates.size();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof TaskState) {
-			TaskState other = (TaskState) obj;
-
-			return jobVertexID.equals(other.jobVertexID) && parallelism == other.parallelism &&
-				subtaskStates.equals(other.subtaskStates) && kvStates.equals(other.kvStates);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, kvStates);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
deleted file mode 100644
index 7888d2f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.runtime.checkpoint.savepoint;
-
-import org.apache.flink.migration.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collection;
-
-/**
- * Savepoint version 0.
- *
- * <p>This format was introduced with Flink 1.1.0.
- */
-@SuppressWarnings("deprecation")
-public class SavepointV0 implements Savepoint {
-
-	/** The savepoint version. */
-	public static final int VERSION = 0;
-
-	/** The checkpoint ID */
-	private final long checkpointId;
-
-	/** The task states */
-	private final Collection<TaskState> taskStates;
-
-	public SavepointV0(long checkpointId, Collection<TaskState> taskStates) {
-		this.checkpointId = checkpointId;
-		this.taskStates = Preconditions.checkNotNull(taskStates, "Task States");
-	}
-
-	@Override
-	public int getVersion() {
-		return VERSION;
-	}
-
-	@Override
-	public long getCheckpointId() {
-		return checkpointId;
-	}
-
-	@Override
-	public Collection<org.apache.flink.runtime.checkpoint.TaskState> getTaskStates() {
-		// since checkpoints are never deserialized into this format,
-		// this method should never be called
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Collection<MasterState> getMasterStates() {
-		// since checkpoints are never deserialized into this format,
-		// this method should never be called
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Collection<OperatorState> getOperatorStates() {
-		return null;
-	}
-
-	@Override
-	public void dispose() throws Exception {
-		//NOP
-	}
-
-
-	public Collection<TaskState> getOldTaskStates() {
-		return taskStates;
-	}
-
-	@Override
-	public String toString() {
-		return "Savepoint(version=" + VERSION + ")";
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		SavepointV0 that = (SavepointV0) o;
-		return checkpointId == that.checkpointId && getTaskStates().equals(that.getTaskStates());
-	}
-
-	@Override
-	public int hashCode() {
-		int result = (int) (checkpointId ^ (checkpointId >>> 32));
-		result = 31 * result + taskStates.hashCode();
-		return result;
-	}
-}


Mime
View raw message