Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3B281200CFE for ; Thu, 24 Aug 2017 20:22:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3973816B55E; Thu, 24 Aug 2017 18:22:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1B88F16B550 for ; Thu, 24 Aug 2017 20:22:34 +0200 (CEST) Received: (qmail 25416 invoked by uid 500); 24 Aug 2017 18:22:34 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 25307 invoked by uid 99); 24 Aug 2017 18:22:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Aug 2017 18:22:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2A163DFA20; Thu, 24 Aug 2017 18:22:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: srichter@apache.org To: commits@flink.apache.org Date: Thu, 24 Aug 2017 18:22:41 -0000 Message-Id: <3f9f8062e8ce4c319d5b504e7c0f98dc@git.apache.org> In-Reply-To: <401decbe2ad743c0bab4d580446cfd9c@git.apache.org> References: <401decbe2ad743c0bab4d580446cfd9c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/11] flink git commit: [FLINK-7461] Remove Backwards compatibility with <= Flink 1.1 archived-at: Thu, 24 Aug 2017 18:22:37 -0000 http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 11f14b9..78ac39c 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -43,11 +43,6 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Preconditions; -import org.apache.flink.shaded.guava18.com.google.common.base.Predicate; -import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators; - -import javax.annotation.Nullable; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -55,7 +50,6 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OptionalDataException; import java.io.Serializable; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -716,9 +710,7 @@ public class NFA implements Serializable { return result; } - ////////////////////// Fault-Tolerance / Migration ////////////////////// - - private static final String BEGINNING_STATE_NAME = "$beginningState$"; + ////////////////////// Fault-Tolerance ////////////////////// private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { ois.defaultReadObject(); @@ -729,103 +721,15 @@ public class NFA implements Serializable { final List> readComputationStates = new ArrayList<>(numberComputationStates); - boolean afterMigration = false; for (int i = 0; i < numberComputationStates; i++) { ComputationState computationState = readComputationState(ois); - if (computationState.getState().getName().equals(BEGINNING_STATE_NAME)) { - afterMigration = true; - } - readComputationStates.add(computationState); } - if (afterMigration && !readComputationStates.isEmpty()) { - try { - //Backwards compatibility - this.computationStates.addAll(migrateNFA(readComputationStates)); - final Field newSharedBufferField = NFA.class.getDeclaredField("eventSharedBuffer"); - final Field sharedBufferField = NFA.class.getDeclaredField("sharedBuffer"); - sharedBufferField.setAccessible(true); - newSharedBufferField.setAccessible(true); - newSharedBufferField.set(this, SharedBuffer.migrateSharedBuffer(this.sharedBuffer)); - sharedBufferField.set(this, null); - sharedBufferField.setAccessible(false); - newSharedBufferField.setAccessible(false); - } catch (Exception e) { - throw new IllegalStateException("Could not migrate from earlier version", e); - } - } else { - this.computationStates.addAll(readComputationStates); - } - + this.computationStates.addAll(readComputationStates); nonDuplicatingTypeSerializer.clearReferences(); } - /** - * Needed for backward compatibility. First migrates the {@link State} graph see {@link NFACompiler#migrateGraph(State)}. - * Than recreates the {@link ComputationState}s with the new {@link State} graph. - * @param readStates computation states read from snapshot - * @return collection of migrated computation states - */ - private Collection> migrateNFA(Collection> readStates) { - final ArrayList> computationStates = new ArrayList<>(); - - final State startState = Iterators.find( - readStates.iterator(), - new Predicate>() { - @Override - public boolean apply(@Nullable ComputationState input) { - return input != null && input.getState().getName().equals(BEGINNING_STATE_NAME); - } - }).getState(); - - final Map> convertedStates = NFACompiler.migrateGraph(startState); - - for (ComputationState readState : readStates) { - if (!readState.isStartState()) { - final String previousName = readState.getState().getName(); - final String currentName = Iterators.find( - readState.getState().getStateTransitions().iterator(), - new Predicate>() { - @Override - public boolean apply(@Nullable StateTransition input) { - return input != null && input.getAction() == StateTransitionAction.TAKE; - } - }).getTargetState().getName(); - - final State previousState = convertedStates.get(previousName); - - computationStates.add(ComputationState.createState( - this, - convertedStates.get(currentName), - previousState, - readState.getEvent(), - 0, - readState.getTimestamp(), - readState.getVersion(), - readState.getStartTimestamp() - )); - } - } - - final String startName = Iterators.find(convertedStates.values().iterator(), new Predicate>() { - @Override - public boolean apply(@Nullable State input) { - return input != null && input.isStart(); - } - }).getName(); - - computationStates.add(ComputationState.createStartState( - this, - convertedStates.get(startName), - new DeweyNumber(this.startEventCounter))); - - this.states.clear(); - this.states.addAll(convertedStates.values()); - - return computationStates; - } - @SuppressWarnings("unchecked") private ComputationState readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException { final State state = (State) ois.readObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index c6f69b9..c36e7df 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -18,7 +18,6 @@ package org.apache.flink.cep.nfa; -import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; @@ -335,47 +334,6 @@ public class SharedBuffer implements Serializable { this.pages = pages; } - /** - * For backward compatibility only. Previously the key in {@link SharedBuffer} was {@link State}. - * Now it is {@link String}. - */ - @Internal - static SharedBuffer migrateSharedBuffer(SharedBuffer, T> buffer) { - - final Map> pageMap = new HashMap<>(); - final Map, T>, SharedBufferEntry> entries = new HashMap<>(); - - for (Map.Entry, SharedBufferPage, T>> page : buffer.pages.entrySet()) { - final SharedBufferPage newPage = new SharedBufferPage<>(page.getKey().getName()); - pageMap.put(newPage.getKey(), newPage); - - for (Map.Entry, SharedBufferEntry, T>> pageEntry : page.getValue().entries.entrySet()) { - final SharedBufferEntry newSharedBufferEntry = new SharedBufferEntry<>( - pageEntry.getKey(), - newPage); - newSharedBufferEntry.referenceCounter = pageEntry.getValue().referenceCounter; - entries.put(pageEntry.getValue(), newSharedBufferEntry); - newPage.entries.put(pageEntry.getKey(), newSharedBufferEntry); - } - } - - for (Map.Entry, SharedBufferPage, T>> page : buffer.pages.entrySet()) { - for (Map.Entry, SharedBufferEntry, T>> pageEntry : page.getValue().entries.entrySet()) { - final SharedBufferEntry newEntry = entries.get(pageEntry.getValue()); - for (SharedBufferEdge, T> edge : pageEntry.getValue().edges) { - final SharedBufferEntry targetNewEntry = entries.get(edge.getTarget()); - - final SharedBufferEdge newEdge = new SharedBufferEdge<>( - targetNewEntry, - edge.getVersion()); - newEntry.edges.add(newEdge); - } - } - } - - return new SharedBuffer<>(buffer.valueSerializer, pageMap); - } - private SharedBufferEntry get( final K key, final V value, @@ -1177,76 +1135,4 @@ public class SharedBuffer implements Serializable { return CompatibilityResult.requiresMigration(); } } - - ////////////////// Java Serialization methods for backwards compatibility ////////////////// - - private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { - DataInputViewStreamWrapper source = new DataInputViewStreamWrapper(ois); - ArrayList> entryList = new ArrayList<>(); - ois.defaultReadObject(); - - this.pages = new HashMap<>(); - - int numberPages = ois.readInt(); - - for (int i = 0; i < numberPages; i++) { - // key of the page - @SuppressWarnings("unchecked") - K key = (K) ois.readObject(); - - SharedBufferPage page = new SharedBufferPage<>(key); - - pages.put(key, page); - - int numberEntries = ois.readInt(); - - for (int j = 0; j < numberEntries; j++) { - // restore the SharedBufferEntries for the given page - V value = valueSerializer.deserialize(source); - long timestamp = ois.readLong(); - - ValueTimeWrapper valueTimeWrapper = new ValueTimeWrapper<>(value, timestamp, 0); - SharedBufferEntry sharedBufferEntry = new SharedBufferEntry(valueTimeWrapper, page); - - sharedBufferEntry.referenceCounter = ois.readInt(); - - page.entries.put(valueTimeWrapper, sharedBufferEntry); - - entryList.add(sharedBufferEntry); - } - } - - // read the edges of the shared buffer entries - int numberEdges = ois.readInt(); - - for (int j = 0; j < numberEdges; j++) { - int sourceIndex = ois.readInt(); - int targetIndex = ois.readInt(); - - if (sourceIndex >= entryList.size() || sourceIndex < 0) { - throw new RuntimeException("Could not find source entry with index " + sourceIndex + - ". This indicates a corrupted state."); - } else { - // We've already deserialized the shared buffer entry. Simply read its ID and - // retrieve the buffer entry from the list of entries - SharedBufferEntry sourceEntry = entryList.get(sourceIndex); - - final DeweyNumber version = (DeweyNumber) ois.readObject(); - final SharedBufferEntry target; - - if (targetIndex >= 0) { - if (targetIndex >= entryList.size()) { - throw new RuntimeException("Could not find target entry with index " + targetIndex + - ". This indicates a corrupted state."); - } else { - target = entryList.get(targetIndex); - } - } else { - target = null; - } - - sourceEntry.edges.add(new SharedBufferEdge(target, version)); - } - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index 593c94f..5698de6 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -18,7 +18,6 @@ package org.apache.flink.cep.nfa.compiler; -import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.nfa.NFA; @@ -36,11 +35,6 @@ import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.cep.pattern.conditions.NotCondition; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.shaded.guava18.com.google.common.base.Predicate; -import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators; - -import javax.annotation.Nullable; - import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -858,114 +852,6 @@ public class NFACompiler { } /** - * Used for migrating CEP graphs prior to 1.3. It removes the dummy start, adds the dummy end, and translates all - * states to consuming ones by moving all TAKEs and IGNOREs to the next state. This method assumes each state - * has at most one TAKE and one IGNORE and name of each state is unique. No PROCEED transition is allowed! - * - * @param oldStartState dummy start state of old graph - * @param type of events - * @return map of new states, where key is the name of a state and value is the state itself - */ - @Internal - public static Map> migrateGraph(State oldStartState) { - State oldFirst = oldStartState; - State oldSecond = oldStartState.getStateTransitions().iterator().next().getTargetState(); - - StateTransition oldFirstToSecondTake = Iterators.find( - oldFirst.getStateTransitions().iterator(), - new Predicate>() { - @Override - public boolean apply(@Nullable StateTransition input) { - return input != null && input.getAction() == StateTransitionAction.TAKE; - } - - }); - - StateTransition oldFirstIgnore = Iterators.find( - oldFirst.getStateTransitions().iterator(), - new Predicate>() { - @Override - public boolean apply(@Nullable StateTransition input) { - return input != null && input.getAction() == StateTransitionAction.IGNORE; - } - - }, null); - - StateTransition oldSecondToThirdTake = Iterators.find( - oldSecond.getStateTransitions().iterator(), - new Predicate>() { - @Override - public boolean apply(@Nullable StateTransition input) { - return input != null && input.getAction() == StateTransitionAction.TAKE; - } - - }, null); - - final Map> convertedStates = new HashMap<>(); - State newSecond; - State newFirst = new State<>(oldSecond.getName(), State.StateType.Start); - convertedStates.put(newFirst.getName(), newFirst); - while (oldSecondToThirdTake != null) { - - newSecond = new State(oldSecondToThirdTake.getTargetState().getName(), State.StateType.Normal); - convertedStates.put(newSecond.getName(), newSecond); - newFirst.addTake(newSecond, oldFirstToSecondTake.getCondition()); - - if (oldFirstIgnore != null) { - newFirst.addIgnore(oldFirstIgnore.getCondition()); - } - - oldFirst = oldSecond; - - oldFirstToSecondTake = Iterators.find( - oldFirst.getStateTransitions().iterator(), - new Predicate>() { - @Override - public boolean apply(@Nullable StateTransition input) { - return input != null && input.getAction() == StateTransitionAction.TAKE; - } - - }); - - oldFirstIgnore = Iterators.find( - oldFirst.getStateTransitions().iterator(), - new Predicate>() { - @Override - public boolean apply(@Nullable StateTransition input) { - return input != null && input.getAction() == StateTransitionAction.IGNORE; - } - - }, null); - - oldSecond = oldSecondToThirdTake.getTargetState(); - - oldSecondToThirdTake = Iterators.find( - oldSecond.getStateTransitions().iterator(), - new Predicate>() { - @Override - public boolean apply(@Nullable StateTransition input) { - return input != null && input.getAction() == StateTransitionAction.TAKE; - } - - }, null); - - newFirst = newSecond; - } - - final State endingState = new State<>(ENDING_STATE_NAME, State.StateType.Final); - - newFirst.addTake(endingState, oldFirstToSecondTake.getCondition()); - - if (oldFirstIgnore != null) { - newFirst.addIgnore(oldFirstIgnore.getCondition()); - } - - convertedStates.put(endingState.getName(), endingState); - - return convertedStates; - } - - /** * Factory interface for {@link NFA}. * * @param Type of the input events which are processed by the NFA http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 7556d9f..257d3e7 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -24,48 +24,29 @@ import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; -import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.EventComparator; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; -import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Triggerable; -import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.util.Migration; import org.apache.flink.util.Preconditions; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.PriorityQueue; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -80,23 +61,17 @@ import java.util.stream.StreamSupport; * @param Type of the input elements * @param Type of the key on which the input stream is keyed * @param Type of the output elements - * @param user function that can be applied to matching sequences or timed out sequences */ public abstract class AbstractKeyedCEPPatternOperator extends AbstractUdfStreamOperator - implements OneInputStreamOperator, Triggerable, CheckpointedRestoringOperator { + implements OneInputStreamOperator, Triggerable { private static final long serialVersionUID = -4166778210774160757L; - private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11; - private final boolean isProcessingTime; private final TypeSerializer inputSerializer; - // necessary to serialize the set of seen keys - private final TypeSerializer keySerializer; - /////////////// State ////////////// private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorStateName"; @@ -115,12 +90,6 @@ public abstract class AbstractKeyedCEPPatternOperator comparator; public AbstractKeyedCEPPatternOperator( @@ -135,10 +104,7 @@ public abstract class AbstractKeyedCEPPatternOperator> oldNfaOperatorState = getRuntimeContext().getState( - new ValueStateDescriptor<>("nfaOperatorState", new NFA.Serializer())); - - ValueState>> oldPriorityQueueOperatorState = - getRuntimeContext().getState( - new ValueStateDescriptor<>( - "priorityQueueStateName", - new PriorityQueueSerializer<>( - ((TypeSerializer) new StreamElementSerializer<>(inputSerializer)), - new PriorityQueueStreamRecordFactory() - ) - ) - ); - - if (migratingFromOldKeyedOperator) { - int numberEntries = inputView.readInt(); - for (int i = 0; i < numberEntries; i++) { - KEY key = keySerializer.deserialize(inputView); - setCurrentKey(key); - saveRegisterWatermarkTimer(); - - NFA nfa = oldNfaOperatorState.value(); - oldNfaOperatorState.clear(); - nfaOperatorState.update(nfa); - - PriorityQueue> priorityQueue = oldPriorityQueueOperatorState.value(); - if (priorityQueue != null && !priorityQueue.isEmpty()) { - Map> elementMap = new HashMap<>(); - for (StreamRecord record: priorityQueue) { - long timestamp = record.getTimestamp(); - IN element = record.getValue(); - - List elements = elementMap.get(timestamp); - if (elements == null) { - elements = new ArrayList<>(); - elementMap.put(timestamp, elements); - } - elements.add(element); - } - - // write the old state into the new one. - for (Map.Entry> entry: elementMap.entrySet()) { - elementQueueState.put(entry.getKey(), entry.getValue()); - } - - // clear the old state - oldPriorityQueueOperatorState.clear(); - } - } - } else { - - final ObjectInputStream ois = new ObjectInputStream(in); - - // retrieve the NFA - @SuppressWarnings("unchecked") - NFA nfa = (NFA) ois.readObject(); - - // retrieve the elements that were pending in the priority queue - MultiplexingStreamRecordSerializer recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer); - - Map> elementMap = new HashMap<>(); - int entries = ois.readInt(); - for (int i = 0; i < entries; i++) { - StreamElement streamElement = recordSerializer.deserialize(inputView); - StreamRecord record = streamElement.asRecord(); - - long timestamp = record.getTimestamp(); - IN element = record.getValue(); - - List elements = elementMap.get(timestamp); - if (elements == null) { - elements = new ArrayList<>(); - elementMap.put(timestamp, elements); - } - elements.add(element); - } - - // finally register the retrieved state with the new keyed state. - setCurrentKey((byte) 0); - nfaOperatorState.update(nfa); - - // write the priority queue to the new map state. - for (Map.Entry> entry: elementMap.entrySet()) { - elementQueueState.put(entry.getKey(), entry.getValue()); - } - - if (!isProcessingTime) { - // this is relevant only for event/ingestion time - setCurrentKey((byte) 0); - saveRegisterWatermarkTimer(); - } - ois.close(); - } - } - - ////////////////////// Utility Classes ////////////////////// - - /** - * Custom type serializer implementation to serialize priority queues. - * - * @param Type of the priority queue's elements - */ - private static class PriorityQueueSerializer extends TypeSerializer> { - - private static final long serialVersionUID = -231980397616187715L; - - private final TypeSerializer elementSerializer; - private final PriorityQueueFactory factory; - - PriorityQueueSerializer(final TypeSerializer elementSerializer, final PriorityQueueFactory factory) { - this.elementSerializer = elementSerializer; - this.factory = factory; - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public TypeSerializer> duplicate() { - return new PriorityQueueSerializer<>(elementSerializer.duplicate(), factory); - } - - @Override - public PriorityQueue createInstance() { - return factory.createPriorityQueue(); - } - - @Override - public PriorityQueue copy(PriorityQueue from) { - PriorityQueue result = factory.createPriorityQueue(); - - for (T element: from) { - result.offer(elementSerializer.copy(element)); - } - - return result; - } - - @Override - public PriorityQueue copy(PriorityQueue from, PriorityQueue reuse) { - reuse.clear(); - - for (T element: from) { - reuse.offer(elementSerializer.copy(element)); - } - - return reuse; - } - - @Override - public int getLength() { - return 0; - } - - @Override - public void serialize(PriorityQueue record, DataOutputView target) throws IOException { - target.writeInt(record.size()); - - for (T element: record) { - elementSerializer.serialize(element, target); - } - } - - @Override - public PriorityQueue deserialize(DataInputView source) throws IOException { - PriorityQueue result = factory.createPriorityQueue(); - - return deserialize(result, source); - } - - @Override - public PriorityQueue deserialize(PriorityQueue reuse, DataInputView source) throws IOException { - reuse.clear(); - - int numberEntries = source.readInt(); - - for (int i = 0; i < numberEntries; i++) { - reuse.offer(elementSerializer.deserialize(source)); - } - - return reuse; - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof PriorityQueueSerializer) { - @SuppressWarnings("unchecked") - PriorityQueueSerializer other = (PriorityQueueSerializer) obj; - - return factory.equals(other.factory) && elementSerializer.equals(other.elementSerializer); - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof PriorityQueueSerializer; - } - - @Override - public int hashCode() { - return Objects.hash(factory, elementSerializer); - } - - // -------------------------------------------------------------------------------------------- - // Serializer configuration snapshotting & compatibility - // -------------------------------------------------------------------------------------------- - - @Override - public TypeSerializerConfigSnapshot snapshotConfiguration() { - return new CollectionSerializerConfigSnapshot<>(elementSerializer); - } - - @Override - public CompatibilityResult> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof CollectionSerializerConfigSnapshot) { - Tuple2, TypeSerializerConfigSnapshot> previousElemSerializerAndConfig = - ((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); - - CompatibilityResult compatResult = CompatibilityUtil.resolveCompatibilityResult( - previousElemSerializerAndConfig.f0, - UnloadableDummyTypeSerializer.class, - previousElemSerializerAndConfig.f1, - elementSerializer); - - if (!compatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (compatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new PriorityQueueSerializer<>( - new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), factory)); - } - } - - return CompatibilityResult.requiresMigration(); - } - } - - private interface PriorityQueueFactory extends Serializable { - PriorityQueue createPriorityQueue(); - } - - private static class PriorityQueueStreamRecordFactory implements PriorityQueueFactory> { - - private static final long serialVersionUID = 1254766984454616593L; - - @Override - public PriorityQueue> createPriorityQueue() { - return new PriorityQueue>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator()); - } - - @Override - public boolean equals(Object obj) { - return obj instanceof PriorityQueueStreamRecordFactory; - } - - @Override - public int hashCode() { - return getClass().hashCode(); - } - } - ////////////////////// Testing Methods ////////////////////// @VisibleForTesting http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java deleted file mode 100644 index 843d668..0000000 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cep.operator; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.ByteSerializer; -import org.apache.flink.api.java.functions.NullByteKeySelector; -import org.apache.flink.cep.Event; -import org.apache.flink.cep.SubEvent; -import org.apache.flink.cep.nfa.NFA; -import org.apache.flink.cep.nfa.compiler.NFACompiler; -import org.apache.flink.cep.pattern.Pattern; -import org.apache.flink.cep.pattern.conditions.SimpleCondition; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; - -import org.junit.Test; - -import java.net.URL; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentLinkedQueue; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Tests for migration from 1.1.x to 1.3.x. - */ -public class CEPMigration11to13Test { - - private static String getResourceFilename(String filename) { - ClassLoader cl = CEPMigration11to13Test.class.getClassLoader(); - URL resource = cl.getResource(filename); - if (resource == null) { - throw new NullPointerException("Missing snapshot resource."); - } - return resource.getFile(); - } - - @Test - public void testKeyedCEPOperatorMigratation() throws Exception { - - final Event startEvent = new Event(42, "start", 1.0); - final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); - final Event endEvent = new Event(42, "end", 1.0); - - // uncomment these lines for regenerating the snapshot on Flink 1.1 - /* - OneInputStreamOperatorTestHarness> harness = new OneInputStreamOperatorTestHarness<>( - new KeyedCepOperator<>( - Event.createTypeSerializer(), - false, - keySelector, - IntSerializer.INSTANCE, - new NFAFactory())); - harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); - harness.open(); - harness.processElement(new StreamRecord(startEvent, 1)); - harness.processElement(new StreamRecord(new Event(42, "foobar", 1.0), 2)); - harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); - harness.processWatermark(new Watermark(2)); - - harness.processElement(new StreamRecord(middleEvent, 3)); - - // simulate snapshot/restore with empty element queue but NFA state - StreamTaskState snapshot = harness.snapshot(1, 1); - FileOutputStream out = new FileOutputStream( - "src/test/resources/cep-keyed-1_1-snapshot"); - ObjectOutputStream oos = new ObjectOutputStream(out); - oos.writeObject(snapshot); - out.close(); - harness.close(); - */ - - OneInputStreamOperatorTestHarness>> harness = CepOperatorTestUtilities.getCepTestHarness( - CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory())); - - try { - harness.setup(); - harness - .initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-1_1-snapshot")); - harness.open(); - - harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); - harness.processElement(new StreamRecord<>(endEvent, 5)); - - harness.processWatermark(new Watermark(20)); - - ConcurrentLinkedQueue result = harness.getOutput(); - - // watermark and the result - assertEquals(2, result.size()); - - Object resultObject = result.poll(); - assertTrue(resultObject instanceof StreamRecord); - StreamRecord resultRecord = (StreamRecord) resultObject; - assertTrue(resultRecord.getValue() instanceof Map); - - @SuppressWarnings("unchecked") - Map> patternMap = - (Map>) resultRecord.getValue(); - - assertEquals(startEvent, patternMap.get("start").get(0)); - assertEquals(middleEvent, patternMap.get("middle").get(0)); - assertEquals(endEvent, patternMap.get("end").get(0)); - - // and now go for a checkpoint with the new serializers - - final Event startEvent1 = new Event(42, "start", 2.0); - final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0); - final Event endEvent1 = new Event(42, "end", 2.0); - - harness.processElement(new StreamRecord(startEvent1, 21)); - harness.processElement(new StreamRecord(middleEvent1, 23)); - - // simulate snapshot/restore with some elements in internal sorting queue - OperatorStateHandles snapshot = harness.snapshot(1L, 1L); - harness.close(); - - harness = CepOperatorTestUtilities.getCepTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator( - false, - new NFAFactory())); - - harness.setup(); - harness.initializeState(snapshot); - harness.open(); - - harness.processElement(new StreamRecord<>(endEvent1, 25)); - - harness.processWatermark(new Watermark(50)); - - result = harness.getOutput(); - - // watermark and the result - assertEquals(2, result.size()); - - Object resultObject1 = result.poll(); - assertTrue(resultObject1 instanceof StreamRecord); - StreamRecord resultRecord1 = (StreamRecord) resultObject1; - assertTrue(resultRecord1.getValue() instanceof Map); - - @SuppressWarnings("unchecked") - Map> patternMap1 = - (Map>) resultRecord1.getValue(); - - assertEquals(startEvent1, patternMap1.get("start").get(0)); - assertEquals(middleEvent1, patternMap1.get("middle").get(0)); - assertEquals(endEvent1, patternMap1.get("end").get(0)); - } finally { - harness.close(); - } - } - - @Test - public void testNonKeyedCEPFunctionMigration() throws Exception { - - final Event startEvent = new Event(42, "start", 1.0); - final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); - final Event endEvent = new Event(42, "end", 1.0); - - // uncomment these lines for regenerating the snapshot on Flink 1.1 - /* - OneInputStreamOperatorTestHarness> harness = new OneInputStreamOperatorTestHarness<>( - new CEPPatternOperator<>( - Event.createTypeSerializer(), - false, - new NFAFactory())); - harness.open(); - harness.processElement(new StreamRecord(startEvent, 1)); - harness.processElement(new StreamRecord(new Event(42, "foobar", 1.0), 2)); - harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); - harness.processWatermark(new Watermark(2)); - - harness.processElement(new StreamRecord(middleEvent, 3)); - - // simulate snapshot/restore with empty element queue but NFA state - StreamTaskState snapshot = harness.snapshot(1, 1); - FileOutputStream out = new FileOutputStream( - "src/test/resources/cep-non-keyed-1.1-snapshot"); - ObjectOutputStream oos = new ObjectOutputStream(out); - oos.writeObject(snapshot); - out.close(); - harness.close(); - */ - - NullByteKeySelector keySelector = new NullByteKeySelector(); - - OneInputStreamOperatorTestHarness>> harness = - new KeyedOneInputStreamOperatorTestHarness>>( - CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory(), ByteSerializer.INSTANCE, false, null), - keySelector, - BasicTypeInfo.BYTE_TYPE_INFO); - - try { - harness.setup(); - harness.initializeStateFromLegacyCheckpoint( - getResourceFilename("cep-non-keyed-1.1-snapshot")); - harness.open(); - - harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); - harness.processElement(new StreamRecord<>(endEvent, 5)); - - harness.processWatermark(new Watermark(20)); - - ConcurrentLinkedQueue result = harness.getOutput(); - - // watermark and the result - assertEquals(2, result.size()); - - Object resultObject = result.poll(); - assertTrue(resultObject instanceof StreamRecord); - StreamRecord resultRecord = (StreamRecord) resultObject; - assertTrue(resultRecord.getValue() instanceof Map); - - @SuppressWarnings("unchecked") - Map> patternMap = - (Map>) resultRecord.getValue(); - - assertEquals(startEvent, patternMap.get("start").get(0)); - assertEquals(middleEvent, patternMap.get("middle").get(0)); - assertEquals(endEvent, patternMap.get("end").get(0)); - - // and now go for a checkpoint with the new serializers - - final Event startEvent1 = new Event(42, "start", 2.0); - final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0); - final Event endEvent1 = new Event(42, "end", 2.0); - - harness.processElement(new StreamRecord(startEvent1, 21)); - harness.processElement(new StreamRecord(middleEvent1, 23)); - - // simulate snapshot/restore with some elements in internal sorting queue - OperatorStateHandles snapshot = harness.snapshot(1L, 1L); - harness.close(); - - harness = new KeyedOneInputStreamOperatorTestHarness>>( - CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory(), ByteSerializer.INSTANCE), - keySelector, - BasicTypeInfo.BYTE_TYPE_INFO); - - harness.setup(); - harness.initializeState(snapshot); - harness.open(); - - harness.processElement(new StreamRecord<>(endEvent1, 25)); - - harness.processWatermark(new Watermark(50)); - - result = harness.getOutput(); - - // watermark and the result - assertEquals(2, result.size()); - - Object resultObject1 = result.poll(); - assertTrue(resultObject1 instanceof StreamRecord); - StreamRecord resultRecord1 = (StreamRecord) resultObject1; - assertTrue(resultRecord1.getValue() instanceof Map); - - @SuppressWarnings("unchecked") - Map> patternMap1 = - (Map>) resultRecord1.getValue(); - - assertEquals(startEvent1, patternMap1.get("start").get(0)); - assertEquals(middleEvent1, patternMap1.get("middle").get(0)); - assertEquals(endEvent1, patternMap1.get("end").get(0)); - } finally { - harness.close(); - } - } - - private static class NFAFactory implements NFACompiler.NFAFactory { - - private static final long serialVersionUID = 1173020762472766713L; - - private final boolean handleTimeout; - - private NFAFactory() { - this(false); - } - - private NFAFactory(boolean handleTimeout) { - this.handleTimeout = handleTimeout; - } - - @Override - public NFA createNFA() { - - Pattern pattern = Pattern.begin("start").where(new StartFilter()) - .followedBy("middle").subtype(SubEvent.class).where(new MiddleFilter()) - .followedBy("end").where(new EndFilter()) - // add a window timeout to test whether timestamps of elements in the - // priority queue in CEP operator are correctly checkpointed/restored - .within(Time.milliseconds(10L)); - - return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout); - } - } - - private static class StartFilter extends SimpleCondition { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("start"); - } - } - - private static class MiddleFilter extends SimpleCondition { - private static final long serialVersionUID = 6215754202506583964L; - - @Override - public boolean filter(SubEvent value) throws Exception { - return value.getVolume() > 5.0; - } - } - - private static class EndFilter extends SimpleCondition { - private static final long serialVersionUID = 7056763917392056548L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("end"); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java index cf3c921..ed28f25 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java @@ -71,7 +71,7 @@ public class CEPMigrationTest { @Parameterized.Parameters(name = "Migration Savepoint: {0}") public static Collection parameters () { - return Arrays.asList(MigrationVersion.v1_2, MigrationVersion.v1_3); + return Arrays.asList(MigrationVersion.v1_3); } public CEPMigrationTest(MigrationVersion migrateVersion) { http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java deleted file mode 100644 index c4e23ca..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.migration; - -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -import java.io.IOException; -import java.io.Serializable; - -/** - * The purpose of this class is the be filled in as a placeholder for the namespace serializer when migrating from - * Flink 1.1 savepoint (which did not include the namespace serializer) to Flink 1.2 (which always must include a - * (non-null) namespace serializer. This is then replaced as soon as the user is re-registering her state again for - * the first run under Flink 1.2 and provides again the real namespace serializer. - * - * @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes. - */ -@Deprecated -@SuppressWarnings("deprecation") -public class MigrationNamespaceSerializerProxy extends TypeSerializer { - - public static final MigrationNamespaceSerializerProxy INSTANCE = new MigrationNamespaceSerializerProxy(); - - private static final long serialVersionUID = -707800010807094491L; - - private MigrationNamespaceSerializerProxy() { - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public TypeSerializer duplicate() { - return this; - } - - @Override - public Serializable createInstance() { - throw new UnsupportedOperationException( - "This is just a proxy used during migration until the real type serializer is provided by the user."); - } - - @Override - public Serializable copy(Serializable from) { - throw new UnsupportedOperationException( - "This is just a proxy used during migration until the real type serializer is provided by the user."); - } - - @Override - public Serializable copy(Serializable from, Serializable reuse) { - throw new UnsupportedOperationException( - "This is just a proxy used during migration until the real type serializer is provided by the user."); - } - - @Override - public int getLength() { - return -1; - } - - @Override - public void serialize(Serializable record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException( - "This is just a proxy used during migration until the real type serializer is provided by the user."); - } - - @Override - public Serializable deserialize(DataInputView source) throws IOException { - throw new UnsupportedOperationException( - "This is just a proxy used during migration until the real type serializer is provided by the user."); - } - - @Override - public Serializable deserialize(Serializable reuse, DataInputView source) throws IOException { - throw new UnsupportedOperationException( - "This is just a proxy used during migration until the real type serializer is provided by the user."); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - throw new UnsupportedOperationException( - "This is just a proxy used during migration until the real type serializer is provided by the user."); - } - - @Override - public TypeSerializerConfigSnapshot snapshotConfiguration() { - return new ParameterlessTypeSerializerConfig(getClass().getCanonicalName()); - } - - @Override - public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - // always assume compatibility since we're just a proxy for migration - return CompatibilityResult.compatible(); - } - - @Override - public boolean equals(Object obj) { - return obj instanceof MigrationNamespaceSerializerProxy; - } - - @Override - public boolean canEqual(Object obj) { - return true; - } - - @Override - public int hashCode() { - return 42; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java deleted file mode 100644 index a6055a8..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.migration; - -import org.apache.flink.migration.state.MigrationKeyGroupStateHandle; -import org.apache.flink.runtime.state.KeyedStateHandle; - -import java.util.Collection; - -/** - * Utility functions for migration. - */ -public class MigrationUtil { - - @SuppressWarnings("deprecation") - public static boolean isOldSavepointKeyedState(Collection keyedStateHandles) { - return (keyedStateHandles != null) - && (keyedStateHandles.size() == 1) - && (keyedStateHandles.iterator().next() instanceof MigrationKeyGroupStateHandle); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java deleted file mode 100644 index 5196d2d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.migration.api.common.state; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.StateBinder; -import org.apache.flink.api.common.state.StateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -/** - * The old version of the {@link org.apache.flink.api.common.state.ListStateDescriptor}, retained for - * serialization backwards compatibility. - * - * @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes. - */ -@Internal -@Deprecated -@SuppressWarnings("deprecation") -public class ListStateDescriptor extends StateDescriptor, T> { - private static final long serialVersionUID = 1L; - - /** - * Creates a new {@code ListStateDescriptor} with the given name and list element type. - * - *

If this constructor fails (because it is not possible to describe the type via a class), - * consider using the {@link #ListStateDescriptor(String, TypeInformation)} constructor. - * - * @param name The (unique) name for the state. - * @param typeClass The type of the values in the state. - */ - public ListStateDescriptor(String name, Class typeClass) { - super(name, typeClass, null); - } - - /** - * Creates a new {@code ListStateDescriptor} with the given name and list element type. - * - * @param name The (unique) name for the state. - * @param typeInfo The type of the values in the state. - */ - public ListStateDescriptor(String name, TypeInformation typeInfo) { - super(name, typeInfo, null); - } - - /** - * Creates a new {@code ListStateDescriptor} with the given name and list element type. - * - * @param name The (unique) name for the state. - * @param typeSerializer The type serializer for the list values. - */ - public ListStateDescriptor(String name, TypeSerializer typeSerializer) { - super(name, typeSerializer, null); - } - - // ------------------------------------------------------------------------ - - @Override - public ListState bind(StateBinder stateBinder) throws Exception { - throw new IllegalStateException("Cannot bind states with a legacy state descriptor."); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - ListStateDescriptor that = (ListStateDescriptor) o; - - return serializer.equals(that.serializer) && name.equals(that.name); - - } - - @Override - public int hashCode() { - int result = serializer.hashCode(); - result = 31 * result + name.hashCode(); - return result; - } - - @Override - public String toString() { - return "ListStateDescriptor{" + - "serializer=" + serializer + - '}'; - } - - @Override - public org.apache.flink.api.common.state.StateDescriptor.Type getType() { - return org.apache.flink.api.common.state.StateDescriptor.Type.LIST; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java deleted file mode 100644 index 0b25e08..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.migration.runtime.checkpoint; - -import org.apache.flink.migration.runtime.state.StateHandle; -import org.apache.flink.migration.util.SerializedValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; - -/** - * Simple container class which contains the serialized state handle for a key group. - * - * The key group state handle is kept in serialized form because it can contain user code classes - * which might not be available on the JobManager. - * - * @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes. - */ -@Deprecated -@SuppressWarnings("deprecation") -public class KeyGroupState implements Serializable { - private static final long serialVersionUID = -5926696455438467634L; - - private static final Logger LOG = LoggerFactory.getLogger(KeyGroupState.class); - - private final SerializedValue> keyGroupState; - - private final long stateSize; - - private final long duration; - - public KeyGroupState(SerializedValue> keyGroupState, long stateSize, long duration) { - this.keyGroupState = keyGroupState; - - this.stateSize = stateSize; - - this.duration = duration; - } - - public SerializedValue> getKeyGroupState() { - return keyGroupState; - } - - public long getDuration() { - return duration; - } - - public long getStateSize() { - return stateSize; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof KeyGroupState) { - KeyGroupState other = (KeyGroupState) obj; - - return keyGroupState.equals(other.keyGroupState) && stateSize == other.stateSize && - duration == other.duration; - } else { - return false; - } - } - - @Override - public int hashCode() { - return (int) (this.stateSize ^ this.stateSize >>> 32) + - 31 * ((int) (this.duration ^ this.duration >>> 32) + - 31 * keyGroupState.hashCode()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java deleted file mode 100644 index d42d146..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.migration.runtime.checkpoint; - -import org.apache.flink.migration.runtime.state.StateHandle; -import org.apache.flink.migration.util.SerializedValue; - -import java.io.Serializable; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes. - */ -@Deprecated -@SuppressWarnings("deprecation") -public class SubtaskState implements Serializable { - - private static final long serialVersionUID = -2394696997971923995L; - - /** The state of the parallel operator */ - private final SerializedValue> state; - - /** - * The state size. This is also part of the deserialized state handle. - * We store it here in order to not deserialize the state handle when - * gathering stats. - */ - private final long stateSize; - - /** The duration of the acknowledged (ack timestamp - trigger timestamp). */ - private final long duration; - - public SubtaskState( - SerializedValue> state, - long stateSize, - long duration) { - - this.state = checkNotNull(state, "State"); - // Sanity check and don't fail checkpoint because of this. - this.stateSize = stateSize >= 0 ? stateSize : 0; - - this.duration = duration; - } - - // -------------------------------------------------------------------------------------------- - - public SerializedValue> getState() { - return state; - } - - public long getStateSize() { - return stateSize; - } - - public long getDuration() { - return duration; - } - - public void discard(ClassLoader userClassLoader) throws Exception { - - } - - // -------------------------------------------------------------------------------------------- - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - else if (o instanceof SubtaskState) { - SubtaskState that = (SubtaskState) o; - return this.state.equals(that.state) && stateSize == that.stateSize && - duration == that.duration; - } - else { - return false; - } - } - - @Override - public int hashCode() { - return (int) (this.stateSize ^ this.stateSize >>> 32) + - 31 * ((int) (this.duration ^ this.duration >>> 32) + - 31 * state.hashCode()); - } - - @Override - public String toString() { - return String.format("SubtaskState(Size: %d, Duration: %d, State: %s)", stateSize, duration, state); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java deleted file mode 100644 index c0a7b2d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.migration.runtime.checkpoint; - -import org.apache.flink.migration.runtime.state.StateHandle; -import org.apache.flink.migration.util.SerializedValue; -import org.apache.flink.runtime.jobgraph.JobVertexID; - -import java.io.Serializable; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.Set; - -/** - * @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes. - */ -@Deprecated -@SuppressWarnings("deprecation") -public class TaskState implements Serializable { - - private static final long serialVersionUID = -4845578005863201810L; - - private final JobVertexID jobVertexID; - - /** Map of task states which can be accessed by their sub task index */ - private final Map subtaskStates; - - /** Map of key-value states which can be accessed by their key group index */ - private final Map kvStates; - - /** Parallelism of the operator when it was checkpointed */ - private final int parallelism; - - public TaskState(JobVertexID jobVertexID, int parallelism) { - this.jobVertexID = jobVertexID; - - this.subtaskStates = new HashMap<>(parallelism); - - this.kvStates = new HashMap<>(); - - this.parallelism = parallelism; - } - - public JobVertexID getJobVertexID() { - return jobVertexID; - } - - public void putState(int subtaskIndex, SubtaskState subtaskState) { - if (subtaskIndex < 0 || subtaskIndex >= parallelism) { - throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex + - " exceeds the maximum number of sub tasks " + subtaskStates.size()); - } else { - subtaskStates.put(subtaskIndex, subtaskState); - } - } - - public SubtaskState getState(int subtaskIndex) { - if (subtaskIndex < 0 || subtaskIndex >= parallelism) { - throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex + - " exceeds the maximum number of sub tasks " + subtaskStates.size()); - } else { - return subtaskStates.get(subtaskIndex); - } - } - - public Collection getStates() { - return subtaskStates.values(); - } - - public Map getSubtaskStatesById() { - return subtaskStates; - } - - public long getStateSize() { - long result = 0L; - - for (SubtaskState subtaskState : subtaskStates.values()) { - result += subtaskState.getStateSize(); - } - - for (KeyGroupState keyGroupState : kvStates.values()) { - result += keyGroupState.getStateSize(); - } - - return result; - } - - public int getNumberCollectedStates() { - return subtaskStates.size(); - } - - public int getParallelism() { - return parallelism; - } - - public void putKvState(int keyGroupId, KeyGroupState keyGroupState) { - kvStates.put(keyGroupId, keyGroupState); - } - - public KeyGroupState getKvState(int keyGroupId) { - return kvStates.get(keyGroupId); - } - - /** - * Retrieve the set of key-value state key groups specified by the given key group partition set. - * The key groups are returned as a map where the key group index maps to the serialized state - * handle of the key group. - * - * @param keyGroupPartition Set of key group indices - * @return Map of serialized key group state handles indexed by their key group index. - */ - public Map>> getUnwrappedKvStates(Set keyGroupPartition) { - HashMap>> result = new HashMap<>(keyGroupPartition.size()); - - for (Integer keyGroupId : keyGroupPartition) { - KeyGroupState keyGroupState = kvStates.get(keyGroupId); - - if (keyGroupState != null) { - result.put(keyGroupId, kvStates.get(keyGroupId).getKeyGroupState()); - } - } - - return result; - } - - public int getNumberCollectedKvStates() { - return kvStates.size(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof TaskState) { - TaskState other = (TaskState) obj; - - return jobVertexID.equals(other.jobVertexID) && parallelism == other.parallelism && - subtaskStates.equals(other.subtaskStates) && kvStates.equals(other.kvStates); - } else { - return false; - } - } - - @Override - public int hashCode() { - return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, kvStates); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java deleted file mode 100644 index 7888d2f..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.migration.runtime.checkpoint.savepoint; - -import org.apache.flink.migration.runtime.checkpoint.TaskState; -import org.apache.flink.runtime.checkpoint.MasterState; -import org.apache.flink.runtime.checkpoint.OperatorState; -import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; -import org.apache.flink.util.Preconditions; - -import java.util.Collection; - -/** - * Savepoint version 0. - * - *

This format was introduced with Flink 1.1.0. - */ -@SuppressWarnings("deprecation") -public class SavepointV0 implements Savepoint { - - /** The savepoint version. */ - public static final int VERSION = 0; - - /** The checkpoint ID */ - private final long checkpointId; - - /** The task states */ - private final Collection taskStates; - - public SavepointV0(long checkpointId, Collection taskStates) { - this.checkpointId = checkpointId; - this.taskStates = Preconditions.checkNotNull(taskStates, "Task States"); - } - - @Override - public int getVersion() { - return VERSION; - } - - @Override - public long getCheckpointId() { - return checkpointId; - } - - @Override - public Collection getTaskStates() { - // since checkpoints are never deserialized into this format, - // this method should never be called - throw new UnsupportedOperationException(); - } - - @Override - public Collection getMasterStates() { - // since checkpoints are never deserialized into this format, - // this method should never be called - throw new UnsupportedOperationException(); - } - - @Override - public Collection getOperatorStates() { - return null; - } - - @Override - public void dispose() throws Exception { - //NOP - } - - - public Collection getOldTaskStates() { - return taskStates; - } - - @Override - public String toString() { - return "Savepoint(version=" + VERSION + ")"; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - - if (o == null || getClass() != o.getClass()) { - return false; - } - - SavepointV0 that = (SavepointV0) o; - return checkpointId == that.checkpointId && getTaskStates().equals(that.getTaskStates()); - } - - @Override - public int hashCode() { - int result = (int) (checkpointId ^ (checkpointId >>> 32)); - result = 31 * result + taskStates.hashCode(); - return result; - } -}